我想理解为什么下面的Java程序给出一个OutOfMemoryError
,而没有.parallel()
它的相应程序没有给出。
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
我有两个问题:
该程序的预期输出是什么?
如果没有,.parallel()
这似乎只是输出sum(1+2+3+...)
,这意味着它只是“卡住”了flatMap的第一个流,这是有道理的。
对于并行,我不知道是否有预期的行为,但是我的猜测是它以某种方式交错了第一个n
左右的流,n
并行工作者的数量在哪里。根据组块/缓冲行为,它也可能略有不同。
是什么导致它的内存不足?我正在专门尝试了解如何在后台实现这些流。
我猜有些东西阻塞了流,所以它永远不会完成,并且能够摆脱生成的值,但是我不太清楚事物的评估顺序和缓冲发生的位置。
编辑:如果相关,我正在使用Java 11。
Editt 2:即使对于简单的程序IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()
,显然也会发生相同的事情,因此它可能与limit
而不是的懒惰有关flatMap
。
您说“ 但我不太清楚事物的评估顺序和发生缓冲的位置 ”,这正是并行流的含义。评估顺序不确定。
您的示例的一个关键方面是.limit(100_000_000)
。这意味着该实现不仅可以求和任意值,还必须求和前100,000,000个数字。请注意,在参考实现中,.unordered().limit(100_000_000)
不会更改结果,这表明无序情况没有特殊的实现,但这只是实现细节。
现在,当工作线程处理这些元素时,它们不能仅仅对其进行汇总,因为它们必须知道允许使用哪些元素,这取决于在其特定工作负荷之前有多少个元素。由于此流不知道大小,因此只有在处理了前缀元素后才能知道,而对于无限流则永远不会发生。因此,工作线程暂时保持缓冲状态,此信息变为可用。
原则上,当工作线程知道它正在处理最左边的工作块时,它可以立即求和,计数并在达到极限时发出结束信号。因此,Stream可能会终止,但这取决于很多因素。
在您的情况下,一个合理的情况是其他工作线程在分配缓冲区时比最左边的作业要快。在这种情况下,时间上的细微变化可能会使流偶尔返回一个值。
当我们减慢所有工作线程(除了处理最左边的块的那个线程之外)时,我们可以使流终止(至少在大多数运行中):
System.out.println(IntStream
.iterate(1, i -> i+1)
.parallel()
.peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
.flatMap(n -> IntStream.iterate(n, i -> i+n))
.limit(100_000_000)
.sum()
);
¹我遵循Stuart Marks的建议,在谈论相遇顺序而不是处理顺序时,请使用从左到右的顺序。
很好的答案!我想知道是否所有的线程都开始运行flatMap操作,而没有一个分配实际清空缓冲区(求和)的风险?在我的实际用例中,无限流是文件太大而无法保存在内存中。我想知道如何重写流以降低内存使用量吗?
您正在使用
Files.lines(…)
吗?在Java 9中已对其进行了重大改进。这就是在Java 8中所做的事情。在较新的JRE中,
BufferedReader.lines()
在某些情况下(它不是默认的文件系统,特殊的字符集或大于的大小Integer.MAX_FILES
),它仍然会使用。如果其中一种适用,那么定制解决方案可能会有所帮助。值得进行新的问答。Integer.MAX_VALUE
, 当然…什么是外部流,文件流?它有可预测的大小吗?