Maarten Boekhold
Maarten Boekhold

Reputation: 877

Java producer/consumer, detecting end of processing

I'm preparing an application where a single producer generates several million tasks, which will then be processed by a configurable number of consumers. Communication from producer to consumer is (probably) going to be queue-based.

From the thread that runs the producer/generates the tasks, what method can I use to wait for completion of all tasks? I'd rather not resume to any periodic polling to see if my tasks queue is empty. In any case, the task queue being empty isn't actually a guarantee that the last tasks have completed. Those tasks can be relatively long-running, so it's quite possible that the queue is empty while the consumer threads are still happily processing.

Rgds, Maarten

Upvotes: 3

Views: 2157

Answers (4)

Nullpointer
Nullpointer

Reputation: 1086

You can use join() method for each thread ..so that till all the threads are done your main thread will not end! And by this way you can actually find out whether all the threads are done or not!

Upvotes: 0

b_erb
b_erb

Reputation: 21241

You might want to have a look at the java.util.concurrent package.

The executor framework already provides means to execute tasks via threadpool. The Future abstraction allows to wait for the completition of tasks.

Putting both together allows you coordinate the executions easily, decoupling tasks, activities (threads) and results.

Example:

    ExecutorService executorService = Executors.newFixedThreadPool(16);

    List<Callable<Void>> tasks = null;
    //TODO: fill tasks;

    //dispatch 
    List<Future<Void>> results =  executorService.invokeAll(tasks);

    //Wait until all tasks have completed
    for(Future<Void> result: results){
        result.get();
    }

Edit: Alternative Version using CountDownLatch

    ExecutorService executorService = Executors.newFixedThreadPool(16);

    final CountDownLatch latch;

    List<Callable<Void>> tasks = null;
    //TODO: fill tasks;

    latch = new CountDownLatch(tasks.size());

    //dispatch 
    executorService.invokeAll(tasks);

    //Wait until all tasks have completed
    latch.await();

And inside your tasks:

    Callable<Void> task = new Callable<Void>()
    {

        @Override
        public Void call() throws Exception
        {
            // TODO: do your stuff

            latch.countDown(); //<---- important part
            return null;
        }
    };

Upvotes: 3

You could have each consumer check to see if the queue is empty when they dequeue, and, if it is, pulse a condvar (or a Monitor, since I believe that's what Java has) on which the main thread is waiting.

Having the threads check a global boolean variable (marked as volatile) is a way to let the threads know that they should stop.

Upvotes: 1

Peter Lawrey
Peter Lawrey

Reputation: 533530

You want to know where every tasks completes. I would have another queue of completed task reports. (One object/message per task) When this count reaches the number of tasks you created, they have all completed. This task report can also have any errors and timing information for the task.

Upvotes: 1

Related Questions