matanox
matanox

Reputation: 13746

How to consume a process' stdout as a stream, without blocking?

In Java (or clojure) I would like to spin up an external process and consume its stdout as a stream. Ideally, I would like to consume the process' output stream every time that the external process flushes it, but am not sure how that can be accomplished, and how it can be accomplished without blocking.

Going around consuming a Java ProcessPipeInputStream for a shelled out process (for example a Unix ProcessPipeInputStream), I find the inherited InputStream methods a bit low-level to work with, and am not sure if there's a non-blocking way to consume from the stream every time the producer-side flushes or otherwise in a non-blocking fashion.

Many code examples block on the output stream in an infinite loop, thereby hogging a thread for the listening. My hope is this blocking behavior can be avoided altogether.

Bottom line:

Is there a non-blocking way to be notified on an input stream, every time that the producing side of it flushes?

Upvotes: 3

Views: 1135

Answers (1)

Valentin Ruano
Valentin Ruano

Reputation: 2819

You need to create a separate Thread that would consume from such a stream allowing the rest of your program to do whatever is meant to be do doing in parallel.

class ProcessOutputReader implements Runnable {

   private InputStream processOutput;

   public ProcessOutputReader(final InputStream processOutput) {
       this.processOutput = processOutput;
   } 

   @Override
   public void run() {
      int nextByte;
      while ((nextByte = processOutput.read()) != -1) {
        // do whatever you need to do byte-by-byte.
        processByte(nextByte);
      }
   }
}

class Main {
   public static void main(final String[] args) {
       final Process proc = ...; 
       final ProcessOutputReader reader = new ProcessOutputReader(proc.getInputStream());
       final Thread processOutputReaderThread = new Thread(reader);
       processOutputReaderThread.setDaemon(true); // allow the VM to terminate if this is the only thread still active.
       processOutputReaderThread.start();
       ...
       // if you wanna wait for the whole process output to be processed at some point you can do this:
       try {
         processOutputReaderThread.join();
       } catch (final InterruptedException ex) {
         // you need to decide how to recover from if your wait was interrupted.
       }
   }
}

If instead of processing byte-by-byte you want to deal with each flush as a single piece... I'm not sure there is 100% guaranteed to be able tocapture each process flush. After all the process own's IO framework software (Java, C, Python, etc.) may process the "flush" operation differently and perhaps what you end up receiving is multiple blocks of bytes for any given flush in that external process.

In any case you can attempt to do that by using the InputStream's available method like so:

   @Override
   public void run() {
      int nextByte;
      while ((nextByte = processOutput.read()) != -1) {
        final int available = processOutput.available();
        byte[] block = new byte[available + 1];
        block[0] = nextByte;
        final int actuallyAvailable = processOutput.read(block, 1, available);
        if (actuallyAvailable < available) { 
          if (actuallyAvailable == -1) { 
            block = new byte[] { nextByte };
          } else {
            block = Arrays.copyOf(block, actuallyAvailable + 1);
          } 
        }
        // do whatever you need to do on that block now.
        processBlock(block);
      }
   }

I'm not 100% sure of this but I think that one cannot trust that available will return a guaranteed lower bound of the number of bytes that you can retrieve without being block nor that the next read operation is going to return that number of available bytes if so requested; that is why the code above checks on the actual number of bytes read (actuallyAvailable).

Upvotes: 1

Related Questions