温馨提示:本文翻译自stackoverflow.com,查看原文请点击:其他 - Parallel Infinite Java Streams run out of Memory

其他 - 并行无限Java流内存不足

发布于 2020-03-31 23:08:14

我想理解为什么下面的Java程序给出一个OutOfMemoryError,而没有.parallel()的相应程序没有给出

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

我有两个问题:

  1. 该程序的预期输出是什么?

    如果没有,.parallel()这似乎只是输出sum(1+2+3+...),这意味着它只是“卡住”了flatMap的第一个流,这是有道理的。

    对于并行,我不知道是否有预期的行为,但是我的猜测是它以某种方式交错了第一个n左右的流,n并行工作者的数量在哪里根据组块/缓冲行为,它也可能略有不同。

  2. 是什么导致它的内存不足?我正在专门尝试了解如何在后台实现这些流。

    我猜有些东西阻塞了流,所以它永远不会完成,并且能够摆脱生成的值,但是我不太清楚事物的评估顺序和缓冲发生的位置。

编辑:如果相关,我正在使用Java 11。

Editt 2:即使对于简单的程序IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()显然也会发生相同的事情,因此它可能与limit而不是的懒惰有关flatMap

查看更多

提问者
Thomas Ahle
被浏览
24
65.6k 2020-01-31 21:46

您说“ 但我不太清楚事物的评估顺序和发生缓冲的位置 ”,这正是并行流的含义。评估顺序不确定。

您的示例的一个关键方面是.limit(100_000_000)这意味着该实现不仅可以求和任意值,还必须求和前100,000,000个数字。请注意,在参考实现中,.unordered().limit(100_000_000)不会更改结果,这表明无序情况没有特殊的实现,但这只是实现细节。

现在,当工作线程处理这些元素时,它们不能仅仅对其进行汇总,因为它们必须知道允许使用哪些元素,这取决于在其特定工作负荷之前有多少个元素。由于此流不知道大小,因此只有在处理了前缀元素后才能知道,而对于无限流则永远不会发生。因此,工作线程暂时保持缓冲状态,此信息变为可用。

原则上,当工作线程知道它正在处理最左边的工作块时,它可以立即求和,计数并在达到极限时发出结束信号。因此,Stream可能会终止,但这取决于很多因素。

在您的情况下,一个合理的情况是其他工作线程在分配缓冲区时比最左边的作业要快。在这种情况下,时间上的细微变化可能会使流偶尔返回一个值。

当我们减慢所有工作线程(除了处理最左边的块的那个线程之外)时,我们可以使流终止(至少在大多数运行中):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹我遵循Stuart Marks的建议,在谈论相遇顺序而不是处理顺序时,请使用从左到右的顺序。

发布
问题

分享
好友

手机
浏览

扫码手机浏览