Reputation: 2710
Imagine having some sort of incoming data either by callback or an InputStream
that you need to continuously convert into a Java 8 Stream
. We don't know when the incoming data-stream stops, but we know that it can stop.
So far I've seen two ways of getting around this problem and I'm interested in the best practices on how to achieve this. Mainly because I this must be something someone has faced before. There must be a simpler way of doing it than the ideas below.
1) The most simple way is to treat the source as a Supplier
and just use Stream.generate
to serve data:
Stream.generate(() -> blockCallToGetData());
However, this has the disadvantage, that the stream never ends. So whenever an input source stops sending, the stream just keeps calling the method. Unless we throw a Runtime exception naturally, but this can get ugly fast.
2) The second idea is to use a Iterator
(converted to a Spliterator
) where the next
method blocks until we find the next element. As a crude example:
class BlockingIterator implements Iterator<Data> {
@Override void boolean hasNext() {
return true;
}
@Override Data next() {
return blockCallToGetData();
}
}
The advantage of this is that I can stop the stream by returning false
in the hasNext
method. However, in situations where we do not control the speed of the incoming data (such as in a callback), we would need to keep a buffer of ready elements for the iterator. This buffer could grow infinitely big, before someone calls next
on the iterator.
So, my question goes; what is the best practice for serving blocking input into a stream?
Upvotes: 1
Views: 683
Reputation: 5313
In simple-react we solved this problem by using (simple-react) async Queues (an async wrapper over JDK Queue data-structures) that JDK Streams could read from. If the Queue is closed, the Stream will automatically disconnect.
The fast producer / slow consumer problem can be solved by the Queues. If the (simple-react) async Queue is backed by a bounded Blocking Queue, it will automatically slow down (block) any producing threads once the Queue gets full.
In contrast, the LazyFutureStream stream implementation uses non-blocking Queues internally and will even attempt to turn itself from consumer of data from the Queue, to a producer, if there is no data present (and as such it can operate as an entirely non-blocking Stream)
Example Using PushableStreamBuilder :
PushableLazyFutureStream<Integer> pushable = new PushableStreamBuilder()
.withBackPressureAfter(100)
.withBackPressureOn(true)
.pushableLazyFutureStream();
// pushable.getInput().fromStream(input); would also be acceptable to add input data
pushable.getInput().add(100);
pushable.getInput().close();
List list = pushable.getStream().collect(Collectors.toList());
//list is [100]
Upvotes: 0
Reputation: 95456
The question contains a questionable assumption: that there is a good practice for serving blocking input into a stream. Stream is not a reactive framework; while you can wedge it into being one with a big crowbar, the problems will likely pop out elsewhere as a result. (The EG considered these use cases and concluded that we were better off providing something that does a complete job at one problem rather than a half job at two.)
If you need a reactive framework, the best practice is to use one. RxJava is great.
Upvotes: 5