ula.uvula
ula.uvula

Reputation: 195

Mix explicit and implicit parallelism with java-8 streams

in the past I have written some java programs, using two threads. First thread (producer) was reading data from an API (C library), create a java object, send the object to the other thread. The C API is delivering an event stream (infinite). The threads are using a LinkedBlockingQueue as a pipeline to exchange the objects (put, poll). The second thread (consumer) is dealing with the object. (I also found that code is more readable within the threads. First thread is dealing with the C API stuff and producing proper java objects, second thread is free from C API handling and is dealing with the data).

Now I'm interested, how I can realize this scenario above with the new stream API coming in java 8. But assuming I want to keep the two threads (producer/consumer)! First thread is writing into the stream. Second thread is reading from the stream. I also hope, that I can handle with this technique a better explicit parallelism (producer/consumer) and within the stream I can use some implicit parallelism (e.g. stream.parallel()).

I don't have many experience with the new stream api. So I experimented with the following code below, to solve the idea above.

Questions:

  1. Is 'generate' the best way in this scenario for the producer?
  2. I have an understanding problem how to terminate/close the stream in the producer, if the API has some errors AND I want to shutdown the whole pipeline. Do I use stream.close or throw an exception?
    • 2.1 I used stream.close(). But 'generate' is still running after closing, I found only to throw an exception to terminate the generate part. This exception is going into the stream and consumer is receiving the exception (This is fine for me, consumer can recognize it and terminate). But in this case, the producer has produced more then consumer has processed, while exception is arriving.
    • 2.2 if consumer is using implicit parallelism stream.parallel(). The producer is processing much more items. So I don't see any solution for this problem. (Accessing C API, check error, make decision).
    • 2.3 Throwing the exception in producer arrives at consumer stream, but not all inserted objects are processed.

Once more: the idea is to have an explicit parallelism with the threads. But internally I can deal with the new features and use parallel processing when possible

Thanks for breeding about this problem too.

package sandbox.test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;

public class MyStream {
    private volatile LongStream stream = null;
    private AtomicInteger producerCount = new AtomicInteger(0);
    private AtomicInteger consumerCount = new AtomicInteger(0);
    private AtomicInteger apiError = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
    MyStream appl = new MyStream();
    appl.create();
    }

    private static void sleep(long sleep) {
    try {
        Thread.sleep(sleep);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    }

    private static void apiError(final String pos, final int iteration) {
    RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
    System.out.println(apiException.getMessage());
    throw apiException;
    }

    final private int simulateErrorAfter = 10;

    private Thread produce() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        System.out.println("Producer started");
        stream = LongStream.generate(() -> {
            int localCount;
            // Detect error, while using stream.parallel() processing
            int error = apiError.get();
            if ( error > 0 )
                apiError("1", error);
            // ----- Accessing the C API here -----
            localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
            // ----- Accessing the C API here -----

            // Checking error code from C API
            if ( localCount > simulateErrorAfter ) { // Simulate an API error
                producerCount.decrementAndGet();
                stream.close();
                apiError("2", apiError.incrementAndGet());
            }
            System.out.println("P: " + localCount);
            sleep(200L);
            return localCount;
            });
        System.out.println("Producer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private Thread consume() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        try {
            stream.onClose(new Runnable() {
            @Override
            public void run() {
                System.out.println("Close detected");
            }
            }).parallel().forEach(l -> {
            sleep(1000);
            System.out.println("C: " + l);
            consumerCount.incrementAndGet();
            });
        } catch (Exception e) {
            // Capturing the stream end
            System.out.println(e);
        }
        System.out.println("Consumer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private void create() throws InterruptedException {
    Thread producer = produce();
    while ( stream == null )
        sleep(10);
    Thread consumer = consume();
    producer.join();
    consumer.join();
    System.out.println("Produced: " + producerCount);
    System.out.println("Consumed: " + consumerCount);

    }
}

Upvotes: 3

Views: 531

Answers (1)

Holger
Holger

Reputation: 298103

You need to understand some fundamental points about the Stream API:

  • All operations applied on a stream are lazy and won’t do anything before the terminal operation will be applied. There is no sense in creating the stream using a “producer” thread as this thread won’t do anything. All actions are performed within your “consumer” thread and the background threads started by the Stream implementation itself. The thread that created the Stream instance is completely irrelevant

  • Closing a stream has no relevance for the Stream operation itself, i.e. does not shut down threads. It is meant to release additional resources, e.g. closing the file associated with the stream returned by Files.lines(…). You can schedule such cleanup actions using onClose and the Stream will invoke them when you call close but that’s it. For the Stream class itself it has no meaning.

  • Streams do not model a scenario like “one thread is writing and another one is reading”. Their model is “one thread is calling your Supplier, followed by calling your Consumer and another thread does the same, and x other threads too…”

    If you want to implement a producer/consumer scheme with distinct producer and consumer threads, you are better off using Threads or an ExecutorService and a thread-safe queue.

But you still can use Java 8 features. E.g. there is no need to implement Runnables using inner classes; you can use lambda expression for them.

Upvotes: 3

Related Questions