Warm tip: This article is reproduced from stackoverflow.com, please click
java java-stream out-of-memory lazy-evaluation

Parallel Infinite Java Streams run out of Memory

发布于 2020-03-31 22:53:26

I'm trying to understand why the following Java program gives an OutOfMemoryError, while the corresponding program without .parallel() doesn't.

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

I have two questions:

  1. What is the intended output of this program?

    Without .parallel() it seems that this simply outputs sum(1+2+3+...) which means that it simply "gets stuck" at the first stream in the flatMap, which makes sense.

    With parallel I don't know if there is an expected behaviour, but my guess would be that it somehow interleaved the first n or so streams, where n is the number of parallel workers. It could also be slightly different based on the chunking/buffering behaviour.

  2. What causes it to run out of memory? I'm specifically trying to understand how these streams are implemented under the hood.

    I'm guessing something blocks the stream, so it never finishes and is able to get rid of the generated values, but I don't quite know in which order things are evaluated and where buffering occurs.

Edit: In case it is relevant, I'm using Java 11.

Editt 2: Apparently the same thing happens even for the simple program IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), so it might have to do with the lazyness of limit rather than flatMap.

Questioner
Thomas Ahle
Viewed
39
65.6k 2020-01-31 21:46

You say “but I don't quite know in which order things are evaluated and where buffering occurs”, which is precisely what parallel streams are about. The order of evaluation is unspecified.

A critical aspect of your example is the .limit(100_000_000). This implies that the implementation can’t just sum up arbitrary values, but must sum up the first 100,000,000 numbers. Note that in the reference implementation, .unordered().limit(100_000_000) doesn’t change the outcome, which indicates that there’s no special implementation for the unordered case, but that’s an implementation detail.

Now, when worker threads process the elements, they can’t just sum them up, as they have to know which elements they are allowed to consume, which depends on how many elements are preceding their specific workload. Since this stream doesn’t know the sizes, this can only be known when the prefix elements have been processed, which never happens for infinite streams. So the worker threads keep buffering for the moment, this information becomes available.

In principle, when a worker thread knows that it processes the leftmost¹ work-chunk, it could sum up the elements immediately, count them, and signal the end when reaching the limit. So the Stream could terminate, but this depends on a lot of factors.

In your case, a plausible scenario is that the other worker threads are faster in allocating buffers than the leftmost job is counting. In this scenario, subtle changes to the timing could make the stream occasionally return with a value.

When we slow down all worker threads except the one processing the leftmost chunk, we can make the stream terminate (at least in most runs):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ I’m following a suggestion by Stuart Marks to use left-to-right order when talking about the encounter order rather than the processing order.