Reputation: 195
in the past I have written some java programs, using two threads. First thread (producer) was reading data from an API (C library), create a java object, send the object to the other thread. The C API is delivering an event stream (infinite). The threads are using a LinkedBlockingQueue as a pipeline to exchange the objects (put, poll). The second thread (consumer) is dealing with the object. (I also found that code is more readable within the threads. First thread is dealing with the C API stuff and producing proper java objects, second thread is free from C API handling and is dealing with the data).
Now I'm interested, how I can realize this scenario above with the new stream API coming in java 8. But assuming I want to keep the two threads (producer/consumer)! First thread is writing into the stream. Second thread is reading from the stream. I also hope, that I can handle with this technique a better explicit parallelism (producer/consumer) and within the stream I can use some implicit parallelism (e.g. stream.parallel()).
I don't have many experience with the new stream api. So I experimented with the following code below, to solve the idea above.
Questions:
Once more: the idea is to have an explicit parallelism with the threads. But internally I can deal with the new features and use parallel processing when possible
Thanks for breeding about this problem too.
package sandbox.test;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
public class MyStream {
private volatile LongStream stream = null;
private AtomicInteger producerCount = new AtomicInteger(0);
private AtomicInteger consumerCount = new AtomicInteger(0);
private AtomicInteger apiError = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
MyStream appl = new MyStream();
appl.create();
}
private static void sleep(long sleep) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void apiError(final String pos, final int iteration) {
RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
System.out.println(apiException.getMessage());
throw apiException;
}
final private int simulateErrorAfter = 10;
private Thread produce() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Producer started");
stream = LongStream.generate(() -> {
int localCount;
// Detect error, while using stream.parallel() processing
int error = apiError.get();
if ( error > 0 )
apiError("1", error);
// ----- Accessing the C API here -----
localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
// ----- Accessing the C API here -----
// Checking error code from C API
if ( localCount > simulateErrorAfter ) { // Simulate an API error
producerCount.decrementAndGet();
stream.close();
apiError("2", apiError.incrementAndGet());
}
System.out.println("P: " + localCount);
sleep(200L);
return localCount;
});
System.out.println("Producer terminated");
}
});
thread.start();
return thread;
}
private Thread consume() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
stream.onClose(new Runnable() {
@Override
public void run() {
System.out.println("Close detected");
}
}).parallel().forEach(l -> {
sleep(1000);
System.out.println("C: " + l);
consumerCount.incrementAndGet();
});
} catch (Exception e) {
// Capturing the stream end
System.out.println(e);
}
System.out.println("Consumer terminated");
}
});
thread.start();
return thread;
}
private void create() throws InterruptedException {
Thread producer = produce();
while ( stream == null )
sleep(10);
Thread consumer = consume();
producer.join();
consumer.join();
System.out.println("Produced: " + producerCount);
System.out.println("Consumed: " + consumerCount);
}
}
Upvotes: 3
Views: 531
Reputation: 298103
You need to understand some fundamental points about the Stream
API:
All operations applied on a stream are lazy and won’t do anything before the terminal operation will be applied. There is no sense in creating the stream using a “producer” thread as this thread won’t do anything. All actions are performed within your “consumer” thread and the background threads started by the Stream
implementation itself. The thread that created the Stream
instance is completely irrelevant
Closing a stream has no relevance for the Stream
operation itself, i.e. does not shut down threads. It is meant to release additional resources, e.g. closing the file associated with the stream returned by Files.lines(…)
. You can schedule such cleanup actions using onClose
and the Stream
will invoke them when you call close
but that’s it. For the Stream
class itself it has no meaning.
Stream
s do not model a scenario like “one thread is writing and another one is reading”. Their model is “one thread is calling your Supplier
, followed by calling your Consumer
and another thread does the same, and x other threads too…”
If you want to implement a producer/consumer scheme with distinct producer and consumer threads, you are better off using Thread
s or an ExecutorService
and a thread-safe queue.
But you still can use Java 8 features. E.g. there is no need to implement Runnable
s using inner classes; you can use lambda expression for them.
Upvotes: 3