Reputation: 4129
I want to perform a stream where the output from the stream is then used as the source for the same stream, in the same operation.
I currently perform this sort of operation using a queue; I remove an item, process it, and add any results that need further processing back to the queue. Here are two examples of this sort of thing:
Queue<WorkItem> workQueue = new Queue<>(workToDo);
while(!workQueue.isEmpty()){
WorkItem item = workQueue.remove();
item.doOneWorkUnit();
if(!item.isDone()) workQueue.add(item);
}
Queue<Node> nodes = new Queue<>(rootNodes);
while(!nodesLeft.isEmpty()){
Node node = nodes.remove();
process(node);
nodes.addAll(node.children());
}
I would imagine that the first could be performed concurrently like this:
try {
LinkedBlockingQueue<WorkItem> workQueue = new LinkedBlockingQueue<>();
Stream<WorkItem> reprocess = Stream.generate(() -> workQueue.remove()).parallel();
Stream.concat(workToDo.parallelstream(), reprocess)
.filter(item -> {item.doOneWorkUnit(); return !item.isDone();})
.collect(Collectors.toCollection(() -> workQueue));
} catch (NoSuchElementException e){}
And the second as:
try {
LinkedBlockingQueue<Node> reprocessQueue = new LinkedBlockingQueue<>();
Stream<WorkItem> reprocess = Stream.generate(() -> nodes.remove()).parallel();
Stream.concat(rootNodes.parallelStream(), reprocess)
.filter(item -> {process(item); return true;})
.flatMap(node -> node.children().parallelStream())
.collect(Collectors.toCollection(() -> reprocessQueue));
} catch (NoSuchElementException e){}
However, these feel like kludgy workarounds, and I dislike having to resort to using exceptions. Does anyone have a better way to do this sort of thing?
Upvotes: 2
Views: 1414
Reputation: 13535
To make work parallel, I would use standard java.util.concurrent.Executor
. To return the task back to working queue, in the end of the code of each task, add executor.execute(this)
.
Upvotes: 1