Alex Abdugafarov
Alex Abdugafarov

Reputation: 6392

Java: merging InputStreams

My goal is to create (or use existing) an InputStream implementation (say, MergeInputStream) that will try to read from a multiple InputStreams and return the first result. After that it will release lock and stop reading from all InputStreams until next mergeInputStream.read() call. I was quite surprised that I didn't found any such tool. The thing is: all of the source InputStreams are not quite finite (not a file, for example, but a System.in, socket or such), so I cannot use SequenceInputReader. I understand that this will probably require some multi-thread mechanism, but I have absolutely no idea how to do it. I tried to google it but with no result.

Upvotes: 4

Views: 2522

Answers (2)

aioobe
aioobe

Reputation: 420991

The problem of reading input from multiple sources and serializing them into one stream is preferably solved using SelectableChannel and Selector. This however requires that all sources are able to provide a selectable channel. This may or may not be the case.

If selectable channels are not available, you could choose to solve it with a single thread by letting the read-implementation do the following: For each input stream is, check if is.available() > 0, and if so return is.read(). Repeat this procedure until some input stream has data available.

This method however, has two major draw-backs:

  1. Not all implementations of InputStream implements available() in a way such that it returns 0 if and only if read() will block. The result is, naturally, that data may not be read from this stream, even though is.read() would return a value. Whether or not this is to be considered as a bug is questionable, as the documentation merely states that it should return an "estimate" of the number of bytes available.

  2. It uses a so called "busy-loop", which basically means that you'll either need to put a sleep in the loop (which results in a reading latency) or hog the CPU unnecessarily.

Your third option is to deal with the blocking reads by spawning one thread for each input stream. This however will require careful synchronization and possibly some overhead if you have a very high number of input streams to read from. The code below is a first attempt to solve it. I'm by no means certain that it is sufficiently synchronized, or that it manages the threads in the best possible way.

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MergedInputStream extends InputStream {
    
    AtomicInteger openStreamCount;
    BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
    InputStream[] sources;
    
    public MergedInputStream(InputStream... sources) {
        this.sources = sources;
        openStreamCount = new AtomicInteger(sources.length);
        for (int i = 0; i < sources.length; i++)
            new ReadThread(i).start();
    }
    
    
    public void close() throws IOException {
        String ex = "";
        for (InputStream is : sources) {
            try {
                is.close();
            } catch (IOException e) {
                ex += e.getMessage() + " ";
            }
        }
        if (ex.length() > 0)
            throw new IOException(ex.substring(0, ex.length() - 1));
    }
    
    
    public int read() throws IOException {
        if (openStreamCount.get() == 0)
            return -1;
        
        try {
            return buf.take();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
    
    
    private class ReadThread extends Thread {
        
        private final int src;
        public ReadThread(int src) {
            this.src = src;
        }
        
        public void run() {
            try {
                int data;
                while ((data = sources[src].read()) != -1)
                    buf.put(data);
            } catch (IOException ioex) {
            } catch (InterruptedException e) {
            }
            openStreamCount.decrementAndGet();
        }
    }
}

Upvotes: 2

Grodriguez
Grodriguez

Reputation: 21995

I can think of three ways to do this:

  • Use non-blocking I/O (API documentation). This is the cleanest solution.
  • Multiple threads, one for each merged input stream. The threads would block on the read() method of the associated input stream, then notify the MergeInputStream object when data becomes available. The read() method in MergedInputStream would wait for this notification, then read data from the corresponding stream.
  • Single thread with a busy loop. Your MergeInputStream.read() methods would need to loop checking the available() method of every merged input stream. If no data is available, sleep a few ms. Repeat until data becomes available in one of the merged input streams.

Upvotes: 1

Related Questions