AJMansfield
AJMansfield

Reputation: 4129

Java 8 Iterating Stream Operations

I want to perform a stream where the output from the stream is then used as the source for the same stream, in the same operation.

I currently perform this sort of operation using a queue; I remove an item, process it, and add any results that need further processing back to the queue. Here are two examples of this sort of thing:

Queue<WorkItem> workQueue = new Queue<>(workToDo);
while(!workQueue.isEmpty()){
    WorkItem item = workQueue.remove();
    item.doOneWorkUnit();
    if(!item.isDone()) workQueue.add(item);
}

Queue<Node> nodes = new Queue<>(rootNodes);
while(!nodesLeft.isEmpty()){
    Node node = nodes.remove();
    process(node);
    nodes.addAll(node.children());
}

I would imagine that the first could be performed concurrently like this:

try {
    LinkedBlockingQueue<WorkItem> workQueue = new LinkedBlockingQueue<>();
    Stream<WorkItem> reprocess = Stream.generate(() -> workQueue.remove()).parallel();

    Stream.concat(workToDo.parallelstream(), reprocess)
          .filter(item -> {item.doOneWorkUnit(); return !item.isDone();})
          .collect(Collectors.toCollection(() -> workQueue));
} catch (NoSuchElementException e){}

And the second as:

try {
    LinkedBlockingQueue<Node> reprocessQueue = new LinkedBlockingQueue<>();
    Stream<WorkItem> reprocess = Stream.generate(() -> nodes.remove()).parallel();

    Stream.concat(rootNodes.parallelStream(), reprocess)
          .filter(item -> {process(item); return true;})
          .flatMap(node -> node.children().parallelStream())
          .collect(Collectors.toCollection(() -> reprocessQueue));
} catch (NoSuchElementException e){}

However, these feel like kludgy workarounds, and I dislike having to resort to using exceptions. Does anyone have a better way to do this sort of thing?

Upvotes: 2

Views: 1414

Answers (1)

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13535

To make work parallel, I would use standard java.util.concurrent.Executor. To return the task back to working queue, in the end of the code of each task, add executor.execute(this).

Upvotes: 1

Related Questions