Reputation: 91
The goal is to process a continuous stream of elements with the help of Java 8 streams. Therefore, elements are added to the data source of a parallel stream while processing that stream.
The Javadoc of Streams describes the following properties in section "Non-interference":
For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. The notable exception to this are streams whose sources are concurrent collections, which are specifically designed to handle concurrent modification. Concurrent stream sources are those whose Spliterator reports the CONCURRENT characteristic.
That is the reason a ConcurrentLinkedQueue is used in our attempts, which returns true for
new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)
It is not explicitly said, that the data source must not be modified when used in parallel streams.
In our example for each of the elements in the stream the incremented counter value is added to the queue, which is the data source of the stream, until the counter is bigger than N. With calling queue.stream() everything works fine while sequential execution:
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testSequential(N));
}
public static int testSequential(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.stream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
}
});
stream.close();
return check.get();
}
}
As a second attempt the stream is parallel and throws an java.lang.AssertionError because check is smaller than N and not every element in the queue was processed. The stream may have finished execution early because the queue may have gotten empty at some point in time.
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testParallel1(N));
}
public static int testParallel1(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.parallelStream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
}
});
stream.close();
return check.get();
}
}
Next attempt was to signal main thread, once the continuous stream ‘really’ ended (the queue is empty) and close the stream object afterwards. Here the problem is that the stream object appears to read elements from the queue only once or at least not continuously and never reaches the ‘real’ end of the stream.
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
try {
assertEquals(N, testParallel2(N));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static int testParallel2(int N) throws InterruptedException {
final Lock lock = new ReentrantLock();
final Condition cond = lock.newCondition();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.parallelStream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
lock.lock();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
} else {
cond.signal();
}
lock.unlock();
});
lock.lock();
while (check.get() < N) {
cond.await();
}
lock.unlock();
stream.close();
return check.get();
}
}
The questions arising thereby are:
Upvotes: 9
Views: 2008
Reputation: 533462
Stream can be continuously generated or from a collection which is modified, nor is it designed to run continuously. It is designed to process the elements available when the stream is started and return once these have been processed. As soon as the end is reached it stops.
How can we achieve the desired behavior otherwise?
You need to use a different approach. I would use an ExecutorService
where you pass submit task you want to perform.
An alternative would be to use a continuous stream which blocks when there is no result available. Note: this will lock up the Common ForkJoinPool
used by parallel stream and no other code can use it.
Upvotes: 0
Reputation: 298103
There is a significant difference between “modifying the source of the Stream
does not break it” and your assumption “modifications will be reflected by the ongoing Stream
operation”.
The CONCURRENT
property implies that the modification of the source is permitted, i.e. that it will never throw a ConcurrentModificationException
, but it does not imply that you can rely on a specific behavior regarding whether these changes are reflected or not.
The documentation of the CONCURRENT
flag itself says:
Most concurrent collections maintain a consistency policy guaranteeing accuracy with respect to elements present at the point of Spliterator construction, but possibly not reflecting subsequent additions or removals.
This Stream behavior is consistent with the already known behavior of ConcurrentLinkedQueue
:
Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw
ConcurrentModificationException
, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.
It’s hard to say how to “achieve the desired behavior otherwise”, as as you didn’t describe the “desired behavior” in any form other than code, which can be simply replaced with
public static int testSequential(int N) {
return N;
}
public static int testParallel1(int N) {
return N;
}
as that’s the only observable effect… Consider redefining your problem…
Upvotes: 2