Mike
Mike

Reputation: 689

Read from ByteArrayOutputStream while it's being written to

I have a class that is constantly producing data and writing it to a ByteArrayOutputStream on its own thread. I have a 2nd thread that gets a reference to this ByteArrayOutputStream. I want the 2nd thread to read any data (and empty) the ByteArrayOutputStream and then stop when it doesn't get any bytes and sleep. After the sleep, I want it to try to get more data and empty it again.

The examples I see online say to use PipedOutputStream. If my first thread is making the ByteArrayOutputStream available to the outside world from a separate reusable library, I don't see how to hook up the inputStream to it.

How would one setup the PipedInputStream to connect it to the ByteArrayOutputStream to read from it as above? Also, when reading the last block from the ByteArrayOutputStream, will I see bytesRead == -1, indicating when the outputStream is closed from the first thread?

Many thanks, Mike

Upvotes: 3

Views: 3111

Answers (1)

teppic
teppic

Reputation: 7286

Write to the PipedOutputStream directly (that is, don't use a ByteArrayOutputStream at all). They both extend OutputStream and so have the same interface.

There are connect methods in both PipedOutputStream and PipedInputStream that are used to wire two pipes together, or you can use one of the constructors to create a pair.

Writes to the PipedOutputStream will block when the buffer in the PipedInputStream fills up, and reads from the PipedInputStream will block when the buffer is empty, so the producer thread will sleep (block) if it gets "ahead" of the consumer and vice versa.

After blocking the threads wait for 1000ms before rechecking the buffer, so it's good practice to flush the output after writes complete (this will wake the reader if it is sleeping).

Your input stream will see the EOF (bytesRead == -1) when you close the output stream in the producer thread.

import java.io.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PipeTest {
    public static void main(String[] args) throws IOException {
        PipedOutputStream out = new PipedOutputStream();
        // Wire an input stream to the output stream, and use a buffer of 2048 bytes
        PipedInputStream in = new PipedInputStream(out, 2048);

        ExecutorService executor = Executors.newCachedThreadPool();

        // Producer thread.
        executor.execute(() -> {
            try {
                for (int i = 0; i < 10240; i++) {
                    out.write(0);
                    // flush to wake the reader
                    out.flush();
                }
                out.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        // Consumer thread.
        executor.execute(() -> {
            try {
                int b, read = 0;
                while ((b = in.read()) != -1) {
                    read++;
                }
                System.out.println("Read " + read + " bytes.");
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        executor.shutdown();
    }
}

Upvotes: 5

Related Questions