Just Me
Just Me

Reputation: 431

stopping condition for parallel processing of queue, where tasks may generate more tasks

I'm using a PriorityBlockingQueue<T> q to handle parallel processing of tasks. The queue is initialized by some tasks, and the processing of each task may produce some more tasks, that will be added to the queue. I want to process the tasks in parallel, and stop when all the tasks have been processed. Of course the queue may temporarily become empty before we're done, if tasks are still being processed by other threads.

My question is: what's a good (correct, of course, but also elegant, idiomatic, with no unnecessary locking or waiting when the queue is empty) way to do this in Java?

Notes:

  1. I'm using a priority queue, but the tasks may be processed in any order (there's some gain in handling the tasks in roughly the order specified by the priority - but I think it's safe to just ignore this bit).

  2. This answer ("use the Task Parallel Library") seems to address the issue for C#, but it seems that this doesn't exist for Java. This is essentially the same question; it does not seem to have a completely satisfactory answer...

  3. The processing of each task is quite lengthy, so it's ok to have a bit more overhead for the task management if it makes the code more elegant (and that's also why I'm happy to have a single thread in charge of polling tasks from the queue and assigning them to the workers)

Example: As a rough approximation, I'm trying to use parallel BFS to find the depth of a tree that's dynamically generated. You can think of it as looking for a strategy that will maximize your reward while playing a game, where you get a point for every move you make. Each state is a task to be processed. You start at the initial (root) state, you compute all the moves (this computation is lengthy and may generate thousands of moves), and add the states reached by these moves as tasks to be explored.

Upvotes: 1

Views: 386

Answers (1)

Just Me
Just Me

Reputation: 431

I realized a solution should probably allow all threads to submit new tasks recursively, which led me to this answer.

Here's a fully-fleshed version of that answer that handles traversal of a binary tree, obtained by starting from some string and roughly halving it. To support priorities one can simply modify MyTask.run() to pop the string from some auxiliary PriorityBlockingQueue, I decided to omit this because it just clutters the essence of the solution.

        int nProcessors = Runtime.getRuntime().availableProcessors();
        System.out.println("number of processors found " + nProcessors);
        final ExecutorService taskExecutor = Executors.newFixedThreadPool(nProcessors);

        final Phaser phaser = new Phaser();
        int phase = phaser.getPhase();

        class MyTask implements Runnable {
            private String input;

            public MyTask(String input) {
//        this.es = es;
                this.input = input;
            }

            public void run() {
                System.out.println("running on \"" + this.input + "\" invoked in thread " + Thread.currentThread().getName());

                int l = this.input.length();
                if (! ((l == 0) || (l == 1))) {
                    String beginning = this.input.substring(0, l / 2);
                    String ending = this.input.substring(l / 2);
                    phaser.register();
                    phaser.register();
                    taskExecutor.execute(new MyTask(beginning));
                    taskExecutor.execute(new MyTask(ending));
                }
                phaser.arrive();
            }
        }

        phaser.register();
        taskExecutor.execute(new MyTask("a lengthy string to be processed goes here"));
        System.out.println("MAIN: initial task submitted at " + Thread.currentThread().getName() + " awaiting phaseAdvance");
        phaser.awaitAdvance(phase);
        System.out.println("MAIN: all tasks arrived, ok to shutdown?");
        try {
            taskExecutor.shutdown();
            System.out.println("MAIN: sutting down awaiting termination...");
            taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            System.out.println("MAIN: await termination finished.");
        } catch (InterruptedException e) {
            System.out.println("caught an interrupted excepion " + e);
        }

The output is

number of processors found 4
MAIN: initial task submitted at main awaiting phaseAdvance
running on "a lengthy string to be processed goes here" invoked in thread pool-1-thread-1
running on "a lengthy string to b" invoked in thread pool-1-thread-2
running on "e processed goes here" invoked in thread pool-1-thread-3
running on "e processe" invoked in thread pool-1-thread-3
running on "a lengthy " invoked in thread pool-1-thread-4
running on "string to b" invoked in thread pool-1-thread-3
running on "d goes here" invoked in thread pool-1-thread-3
running on "e pro" invoked in thread pool-1-thread-3
running on "cesse" invoked in thread pool-1-thread-3
.
.
.
running on "b" invoked in thread pool-1-thread-2
running on " " invoked in thread pool-1-thread-1
running on "e" invoked in thread pool-1-thread-4
running on "i" invoked in thread pool-1-thread-1
running on "h" invoked in thread pool-1-thread-2
running on " " invoked in thread pool-1-thread-3
running on "n" invoked in thread pool-1-thread-4
MAIN: all tasks arrived, ok to shutdown?
MAIN: sutting down awaiting termination...
MAIN: await termination finished.

Upvotes: 0

Related Questions