Jens Egholm
Jens Egholm

Reputation: 2710

Java 8 Stream utilities for input data

Imagine having some sort of incoming data either by callback or an InputStream that you need to continuously convert into a Java 8 Stream. We don't know when the incoming data-stream stops, but we know that it can stop.

So far I've seen two ways of getting around this problem and I'm interested in the best practices on how to achieve this. Mainly because I this must be something someone has faced before. There must be a simpler way of doing it than the ideas below.

1) The most simple way is to treat the source as a Supplier and just use Stream.generate to serve data:

Stream.generate(() -> blockCallToGetData());

However, this has the disadvantage, that the stream never ends. So whenever an input source stops sending, the stream just keeps calling the method. Unless we throw a Runtime exception naturally, but this can get ugly fast.

2) The second idea is to use a Iterator (converted to a Spliterator) where the next method blocks until we find the next element. As a crude example:

class BlockingIterator implements Iterator<Data> {

  @Override void boolean hasNext() {
    return true;
  }

  @Override Data next() {
    return blockCallToGetData();
  }

}

The advantage of this is that I can stop the stream by returning false in the hasNext method. However, in situations where we do not control the speed of the incoming data (such as in a callback), we would need to keep a buffer of ready elements for the iterator. This buffer could grow infinitely big, before someone calls next on the iterator.

So, my question goes; what is the best practice for serving blocking input into a stream?

Upvotes: 1

Views: 683

Answers (2)

John McClean
John McClean

Reputation: 5313

In simple-react we solved this problem by using (simple-react) async Queues (an async wrapper over JDK Queue data-structures) that JDK Streams could read from. If the Queue is closed, the Stream will automatically disconnect.

The fast producer / slow consumer problem can be solved by the Queues. If the (simple-react) async Queue is backed by a bounded Blocking Queue, it will automatically slow down (block) any producing threads once the Queue gets full.

In contrast, the LazyFutureStream stream implementation uses non-blocking Queues internally and will even attempt to turn itself from consumer of data from the Queue, to a producer, if there is no data present (and as such it can operate as an entirely non-blocking Stream)

Example Using PushableStreamBuilder :

 PushableLazyFutureStream<Integer> pushable = new PushableStreamBuilder()
            .withBackPressureAfter(100)
            .withBackPressureOn(true)
            .pushableLazyFutureStream();

    // pushable.getInput().fromStream(input); would also be acceptable to add input data
    pushable.getInput().add(100); 
    pushable.getInput().close();

    List list = pushable.getStream().collect(Collectors.toList());

     //list is [100]

Upvotes: 0

Brian Goetz
Brian Goetz

Reputation: 95456

The question contains a questionable assumption: that there is a good practice for serving blocking input into a stream. Stream is not a reactive framework; while you can wedge it into being one with a big crowbar, the problems will likely pop out elsewhere as a result. (The EG considered these use cases and concluded that we were better off providing something that does a complete job at one problem rather than a half job at two.)

If you need a reactive framework, the best practice is to use one. RxJava is great.

Upvotes: 5

Related Questions