Adding elements to Java 8 parallel Streams on-the-fly Adding elements to Java 8 parallel Streams on-the-fly multithreading multithreading

Adding elements to Java 8 parallel Streams on-the-fly


There is a significant difference between “modifying the source of the Stream does not break it” and your assumption “modifications will be reflected by the ongoing Stream operation”.

The CONCURRENT property implies that the modification of the source is permitted, i.e. that it will never throw a ConcurrentModificationException, but it does not imply that you can rely on a specific behavior regarding whether these changes are reflected or not.

The documentation of the CONCURRENT flag itself says:

Most concurrent collections maintain a consistency policy guaranteeing accuracy with respect to elements present at the point of Spliterator construction, but possibly not reflecting subsequent additions or removals.

This Stream behavior is consistent with the already known behavior of ConcurrentLinkedQueue:

Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

It’s hard to say how to “achieve the desired behavior otherwise”, as as you didn’t describe the “desired behavior” in any form other than code, which can be simply replaced with

public static int testSequential(int N) {    return N;}public static int testParallel1(int N) {    return N;}

as that’s the only observable effect… Consider redefining your problem


Stream can be continuously generated or from a collection which is modified, nor is it designed to run continuously. It is designed to process the elements available when the stream is started and return once these have been processed. As soon as the end is reached it stops.

How can we achieve the desired behavior otherwise?

You need to use a different approach. I would use an ExecutorService where you pass submit task you want to perform.

An alternative would be to use a continuous stream which blocks when there is no result available. Note: this will lock up the Common ForkJoinPool used by parallel stream and no other code can use it.