Reputation: 36823
I have the following function, in pseudo-code:
Result calc(Data data) {
if (data.isFinal()) {
return new Result(data); // This is the actual lengthy calculation
} else {
List<Result> results = new ArrayList<Result>();
for (int i=0; i<data.numOfSubTasks(); ++i) {
results.add(calc(data.subTask(i));
}
return new Result(results); // merge all results in to a single result
}
}
I want to parallelize it, using a fixed number of threads.
My first attempt was:
ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
Result calc(Data data) {
if (data.isFinal()) {
return new Result(data); // This is the actual lengthy calculation
} else {
List<Result> results = new ArrayList<Result>();
List<Callable<Void>> callables = new ArrayList<Callable<Void>>();
for (int i=0; i<data.numOfSubTasks(); ++i) {
callables.add(new Callable<Void>() {
public Void call() {
results.add(calc(data.subTask(i));
}
});
}
executorService.invokeAll(callables); // wait for all sub-tasks to complete
return new Result(results); // merge all results in to a single result
}
}
However, this quickly got stuck in a deadlock, because, while the top recursion level waits for all threads to finish, the inner levels also wait for threads to become available...
How can I efficiently parallelize my program without deadlocks?
Upvotes: 2
Views: 2174
Reputation: 22446
Your problem is a general design problem when using ThreadPoolExecutor for tasks with dependencies.
I see two options:
1) Make sure to submit tasks in a bottom-up order, so that you never have a running task that depends on a task which didn't start yet.
2) Use the "direct handoff" strategy (See ThreadPoolExecutor documentation):
ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
The idea is using a synchronous queue so that tasks never wait in a real queue. The rejection handler takes care of tasks which don't have an available thread to run on. With this particular handler, the submitter thread runs the rejected tasks.
This executor configuration guarantees that tasks are never rejected, and that you never have deadlocks due to inter-task dependencies.
Upvotes: 5
Reputation: 1171
you should split your approach in two phases:
To do that, you can use [Futures][1]
to make the results async. Means all results of calc will be of type Future[Result].
Immediately returning a Future will free the current thread and give space for the processing of others. With the collection of the Results (new Result(results)) you should wait for all results to be ready (ScatterGather-Pattern, you can use a semaphore to wait for all results). The collection itself will be walking a tree and checking (or waiting for the results to arrive) will happen in a single thread.
Overall you build a tree of Futures, that is used to collect the results and perform only the "expensive" operations in the threadpool.
Upvotes: 0