Reputation: 509
I've been playing with CompletionStage/CompletableFuture in Java 8 in order to do asynchronous processing, which works quite well. However, sometimes I want a stage to perform asynchronous processing of an iterator/stream of items, and there doesn't seem to be a way to do this.
Specifically, Stream.forEach() has the semantics that after the call all items have been processed. I would want the same thing, but with a CompletionStage instead, e.g.:
CompletionStage<Void> done = stream.forEach(...);
done.thenRun(...);
If the Stream is backed by an asynchronous streaming result this would be much better than waiting for it to be complete in the above code itself.
Is it possible to construct this with current Java 8 API somehow? Workarounds?
Upvotes: 36
Views: 37371
Reputation: 1481
Yes, it is possible using the standard API:
public class BufferedAsyncStreamable<T> implements Iterator<T> {
@SuppressWarnings("unchecked")
private final T END_MARKER = (T) new Object();
private final BlockingQueue<T> queue;
private T currentValue;
public BufferedAsyncStreamable(int capacity) {
queue = new ArrayBlockingQueue<>(capacity);
}
public void add(T obj) {
queue.add(obj);
}
public void end() {
queue.add(END_MARKER);
}
@Override
public boolean hasNext() {
try {
return (currentValue = queue.take()) != END_MARKER;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public T next() {
return currentValue;
}
public Stream<T> stream() {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(this, 0), false);
}
}
Example usage:
var buffer = new BufferedAsyncStreamable<String>(10);
new Thread(() -> {
var input = new Scanner(System.in);
for (String line; !"end".equals(line = input.nextLine());) {
buffer.add(line);
}
buffer.end();
}).start();
buffer.stream()
.map(String::toUpperCase)
.forEach(System.err::println);
Reads lines from stdin (until "end"
) and converts them to uppercase as they come in.
Upvotes: 1
Reputation: 11
As mentioned, use reactive programming:
Maven:
groupId: org.projectreactor
artifactId: reactor-spring
Example:
Flux.fromIterable(Arrays.asList("een", "twee"))
.flatMap(string -> translateDutchToEnglish(string))
.parallel()
.runOn(Schedulers.parallel())
.sequential()
.collectList()
.block();
What it does, is create a Flux, which is a set of resolvable items, on which you perform an action: flatMap with callback.
Then, you use the parallel() method to make it asynchronous. runOn() is used to indicate on which thread to run it. Schedulers.parallel() indicates use as many threads as there are cpu cores.
sequential() brings it back to synchronous to let collectList() collect all results in one list again.
To wait for all threads to be done with their business, use block(). The result is a list of with the results.
Upvotes: 1
Reputation: 3050
As far as I know, the streams API does not support asynchronous event processing. Sounds like you want something like Reactive Extensions for .NET, and there is a Java port of it called RxJava, created by Netflix.
RxJava supports many of the same high-level operations as Java 8 streams (such as map and filter) and is asynchronous.
Update: There is now a reactive streams initiative in the works, and it looks like JDK 9 will include support for at least part of it though the Flow class.
Upvotes: 24
Reputation: 1562
It is possible to produce a stream, map each element to CompletionStage
, and collect the results using CompletionStage.thenCombine()
, but the resulting code will not be more readable then using simple for like this.
CompletionStage<Collection<Result>> collectionStage =
CompletableFuture.completedFuture(
new LinkedList<>()
);
for (Request request : requests) {
CompletionStage<Result> resultStage = performRequest(request);
collectionStage = collectionStage.thenCombine(
resultStage,
(collection, result) -> {
collection.add(result);
return collection;
}
);
}
return collectionStage;
This example may be easily transformed to functional forEach not loosing readability. But using stream's reduce
or collect
requires additional not so fine code.
Update: CompletableFuture.allOf
and CompletableFuture.join
provide another, more readable way of transforming collection of future results to future collection of results.
Upvotes: 2
Reputation: 49095
As @KarolKrol alluded to you can do it with a stream of CompletableFuture
.
There is a library that builds on top of JDK8 streams to facilitate working with streams of CompletableFuture
called cyclops-react.
To compose your streams you can use cyclops-react's fluent promise ike API or you can use simple-react's Stage
s.
Upvotes: 7
Reputation: 5313
cyclops-react (I am the author of this library), provides a StreamUtils class for processing Streams. One of the functions it provides is futureOperations, that provides access to the standard Stream terminal operations (and then some) with a twist - the Stream is executed asynchronously and the result is returned inside a CompletableFuture. .e.g
Stream<Integer> stream = Stream.of(1,2,3,4,5,6)
.map(i->i+2);
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream,
Executors.newFixedThreadPool(1))
.collect(Collectors.toList());
There is also a convience class ReactiveSeq that wraps Stream and provides the same functionality, with a nice fluent API
CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6)
.map(i->i+2)
.futureOperations(
Executors.newFixedThreadPool(1))
.collect(Collectors.toList());
As Adam has pointed out cyclops-react FutureStreams are designed to process data asynchronously (by mixing Futures and Streams together) - it is particularly suited for multi-threaded operations that involve blocking I/O (such as reading files, making db calls, making rest calls etc).
Upvotes: 3