Victor P.
Victor P.

Reputation: 675

How to implement an ExecutorService to execute batches of tasks

I am looking for a way to execute batches of tasks in java. The idea is to have an ExecutorService based on a thread pool that will allow me to spread a set of Callable among different threads from a main thread. This class should provide a waitForCompletion method that will put the main thread to sleep until all tasks are executed. Then the main thread should be awaken, and it will perform some operations and resubmit a set of tasks.

This process will be repeated numerous times, so I would like to use ExecutorService.shutdown as this would require to create multiple instances of ExecutorService.

Currently I have implemented it in the following way using a AtomicInteger, and a Lock/Condition:

public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
  private final AtomicInteger mActiveCount;
  private final Lock          mLock;
  private final Condition     mCondition;

  public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
    ...
    for(C task : batch){
      submit(task);
      mActiveCount.incrementAndGet();
    }
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    mLock.lock();
    if (mActiveCount.decrementAndGet() == 0) {
      mCondition.signalAll();
    }
    mLock.unlock();
  }

  public void awaitBatchCompletion() throws InterruptedException {
    ...
    // Lock and wait until there is no active task
    mLock.lock();
    while (mActiveCount.get() > 0) {
      try {
        mCondition.await();
      } catch (InterruptedException e) {
        mLock.unlock();
        throw e;
      }
    }
    mLock.unlock();
  } 
}

Please not that I will not necessarily submit all the tasks from the batch at once, therefore CountDownLatch does not seem to be an option.

Is this a valid way to do it? Is there a more efficient/elegant way to implement that?

Thanks

Upvotes: 6

Views: 13055

Answers (3)

sharakan
sharakan

Reputation: 6901

As the other answers point out, there doesn't seem to be any part of your use case that requires a custom ExecutorService.

It seems to me that all you need to do is submit a batch, wait for them all to finish while ignoring interrupts on the main thread, then submit another batch perhaps based on the results of the first batch. I believe this is just a matter of:

    ExecutorService service = ...;

    Collection<Future> futures = new HashSet<Future>();
    for (Callable callable : tasks) {
        Future future = service.submit(callable);
        futures.add(future);
    }

    for(Future future : futures) {
        try {
            future.get();
        } catch (InterruptedException e) {
            // Figure out if the interruption means we should stop.
        }
    }

    // Use the results of futures to figure out a new batch of tasks.
    // Repeat the process with the same ExecutorService.

Upvotes: 3

Gray
Gray

Reputation: 116828

I agree with @ckuetbach that the default Java Executors should provide you with all of the functionality you need to execute a "batch" of jobs.

If I were you I would just submit a bunch of jobs, wait for them to finish with the ExecutorService.awaitTermination() and then just start up a new ExecutorService. Doing this to save on "thread creations" is premature optimization unless you are doing this 100s of times a second or something.

If you really are stuck on using the same ExecutorService for each of the batches then you can allocate a ThreadPoolExecutor yourself, and be in a loop looking at ThreadPoolExecutor.getActiveCount(). Something like:

BlockingQueue jobQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
    0L, TimeUnit.MILLISECONDS, jobQueue);
// submit your batch of jobs ...
// need to wait a bit for the jobs to start
Thread.sleep(100);
while (executor.getActiveCount() > 0 && jobQueue.size() > 0) {
    // to slow the spin
    Thread.sleep(1000);
}
// continue on to submit the next batch

Upvotes: 0

Christian Kuetbach
Christian Kuetbach

Reputation: 16060

I think the ExecutorService itself will be able to perform your requirements.

Call invokeAll([...]) and iterate over all of your Tasks. All Tasks are finished, if you can iterate through all Futures.

Upvotes: 8

Related Questions