dabadaba
dabadaba

Reputation: 9522

Waiting for all the tasks to finish

I have a series of different "tasks" to be done using the same thread pool. I want to measure the time it takes to perform each task, but for that I need to wait for every task in the "task" (sorry for ambiguity) to finish.

When there's just one task I would normally do this:

ExecutorService e = Executors.newCachedThreadPool();
for (int i=0; i<100; ++i)
    e.submit(target);
e.shutdown();
while (!e.isTerminated());

But since there will be several task submitted to the pool, I can't it down. All the methods that have something to do with waiting for the tasks to finish mention "after shutdown request". Well, what if I don't want to shut it down, but wait for all the threads to finish and then submit more tasks?

This is what I want to do:

ExecutorService e = Executors.newCachedThreadPool();
for (int i=0; i<100; ++i)
    e.submit(target);
// wait for all targets to finish
for (int i=0; i<100; ++i)
    e.submit(target); // submit different tasks
// wait... and so on

I thought of shutting the pool down and then "waking it up" again using prestartAllCoreThreads, but then I realized this was not an ExecutorService method but a ThreadPoolExecutor method. Could this be a solution? Shutting it down, waiting, and then activating the pool again? Seems a bit ugly to me.

I also thought that the most natural thing to do was to use a CyclicBarrier, but it seems too a specific way of doing this, while I think it would be the most logical thing to be able to use any ExecutorService for what I'm trying to do.

Is there any way I could stick to ExecutorServices and wait for all the tasks to finish?

Upvotes: 3

Views: 1172

Answers (3)

omu_negru
omu_negru

Reputation: 4770

Use CyclicBarrier for the work you need like so :

// the optionalRunnable can collect the data gathered by the tasks
CyclicBarrier b = new CyclicBarrier(numberOfTasks,optionalRunnable)

Task yourTaks = new Task(...., b);
// inside the run method call b.await() after the work is done;
executor.submit(yourTaks);

Optionally , you can also call await in the main thread and instantiate the barrier to numTasks + 1 . That way you are sure you're resubmitting tasks to the executor only after it's done processing the current batch

Upvotes: 2

lance-java
lance-java

Reputation: 27994

You could create a TaskListener interface which you pass into each task. Each task notifies the TaskListener when they start and stop. Then you can create a TimingTaskListener implementation which maintains a ConcurrentMap of the durations which can be queried later.

public interface TaskListener {
   void onStart(String taskId);
   void onEnd(String taskId);
}

public class Task implements Runnable {
   private TaskListener taskListener;
   private String taskId;

   public Task(String taskId, TaskListener taskListener) {
      this.taskId = taskId;
      this.listener = listener;
   }

   public void run() {
      listner.onStart(taskId);
      try {
         doStuff();
      } finally {
         listener.onEnd(taskId);
      }
   }
}

// TODO: Implement TimingTaskListener to save durations to a ConcurrentMap
TimingTaskListener timingListener = new TimingTaskListener(); 
Runnable task1 = new Task("task1", timingListener);
Runnable task2 = new Task("task2", timingListener);

Future<?> f1 = e.submit(task1);
Future<?> f2 = e.submit(task2);

// futures block until the task is finished.
// You could also use a CountDownLatch to achieve the same
f1.get();
f2.get();

long time1 = timingListener.getDuration("task1");
long time2 = timingListener.getDuration("task2");

Upvotes: 0

Zarathustra
Zarathustra

Reputation: 2943

You can await the termination of that ExecutorService.

    ExecutorService executor = Executors.newCachedThreadPool();    
   //do your stuff

    try {
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //handle
    }

Or use a CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

and within your task (enclose in try / finally)

latch.countDown();

Upvotes: 2

Related Questions