Reputation: 675
I am looking for a way to execute batches of tasks in java. The idea is to have an ExecutorService
based on a thread pool that will allow me to spread a set of Callable
among different threads from a main
thread. This class should provide a waitForCompletion method that will put the main
thread to sleep until all tasks are executed. Then the main
thread should be awaken, and it will perform some operations and resubmit a set of tasks.
This process will be repeated numerous times, so I would like to use ExecutorService.shutdown
as this would require to create multiple instances of ExecutorService
.
Currently I have implemented it in the following way using a AtomicInteger
, and a Lock
/Condition
:
public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger mActiveCount;
private final Lock mLock;
private final Condition mCondition;
public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
...
for(C task : batch){
submit(task);
mActiveCount.incrementAndGet();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
mLock.lock();
if (mActiveCount.decrementAndGet() == 0) {
mCondition.signalAll();
}
mLock.unlock();
}
public void awaitBatchCompletion() throws InterruptedException {
...
// Lock and wait until there is no active task
mLock.lock();
while (mActiveCount.get() > 0) {
try {
mCondition.await();
} catch (InterruptedException e) {
mLock.unlock();
throw e;
}
}
mLock.unlock();
}
}
Please not that I will not necessarily submit all the tasks from the batch at once, therefore CountDownLatch
does not seem to be an option.
Is this a valid way to do it? Is there a more efficient/elegant way to implement that?
Thanks
Upvotes: 6
Views: 13055
Reputation: 6901
As the other answers point out, there doesn't seem to be any part of your use case that requires a custom ExecutorService.
It seems to me that all you need to do is submit a batch, wait for them all to finish while ignoring interrupts on the main thread, then submit another batch perhaps based on the results of the first batch. I believe this is just a matter of:
ExecutorService service = ...;
Collection<Future> futures = new HashSet<Future>();
for (Callable callable : tasks) {
Future future = service.submit(callable);
futures.add(future);
}
for(Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
// Figure out if the interruption means we should stop.
}
}
// Use the results of futures to figure out a new batch of tasks.
// Repeat the process with the same ExecutorService.
Upvotes: 3
Reputation: 116828
I agree with @ckuetbach that the default Java Executors
should provide you with all of the functionality you need to execute a "batch" of jobs.
If I were you I would just submit a bunch of jobs, wait for them to finish with the ExecutorService.awaitTermination()
and then just start up a new ExecutorService
. Doing this to save on "thread creations" is premature optimization unless you are doing this 100s of times a second or something.
If you really are stuck on using the same ExecutorService
for each of the batches then you can allocate a ThreadPoolExecutor
yourself, and be in a loop looking at ThreadPoolExecutor.getActiveCount()
. Something like:
BlockingQueue jobQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
0L, TimeUnit.MILLISECONDS, jobQueue);
// submit your batch of jobs ...
// need to wait a bit for the jobs to start
Thread.sleep(100);
while (executor.getActiveCount() > 0 && jobQueue.size() > 0) {
// to slow the spin
Thread.sleep(1000);
}
// continue on to submit the next batch
Upvotes: 0
Reputation: 16060
I think the ExecutorService itself will be able to perform your requirements.
Call invokeAll([...])
and iterate over all of your Tasks. All Tasks are finished, if you can iterate through all Futures.
Upvotes: 8