Reputation: 3470
I adopted a the concurrency strategy from this post. However mine looks like this:
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_CREATE_KNOWLEDGE_THREADS);
List<Callable<Collection<Triple>>> todo = new ArrayList<Callable<Collection<Triple>>>(this.patternMappingList.size());
for (PatternMapping mapping : this.patternMappingList ) {
todo.add(new CreateKnowledgeCallable(mapping, i++));
}
try {
List<Future<Collection<Triple>>> answers = executorService.invokeAll(todo);
for (Future<Collection<Triple>> future : answers) {
Collection<Triple> triples = future.get();
this.writeNTriplesFile(triples);
}
}
catch (InterruptedException e) { ... }
catch (ExecutionException e) { ... }
executorService.shutdown();
executorService.shutdownNow();
But the ExecutorService never shuts down. I tried to debug how many of the CreateKnowledgeCallable are finished, but this number seems to vary (after no new threads/callables are executed but the service keeps running). I am sure a logged and printed every possible exception but I can't see one happening. It also seems that after a while nothing happens anymore except that NUMBER_OF_CREATE_KNOWLEDGE_THREADS cpus are spinning at 100% forever. What am I doing wrong? If you need to more specific infos I would be happy to provide them for you!
Kind regards, Daniel
Upvotes: 2
Views: 1066
Reputation: 3470
Everyone with this sort of problems should try to implement the same algorithm without concurrency. With the help of this method, I found that a component has thrown a runtime exception which was swallowed.
Upvotes: 0
Reputation: 6532
are you sure that you submitted tasks actually finish? If you check the API for shutdownNow() and shutdown() you'll see that they do not guarantee termination.
Have you tried using a call to awaitTermination(long timeout, TimeUnit unit) with a reasonable amount of time as timeout parameter? (edit: "reasonable amount of time" depends of course on the mean process time of your tasks as well as the number of tasks executing at the time you call for termination)
Edit2: I hope the following example from my own code might help you out (note that it probably isn't the optimal, or most gracious, way to solve this problem)
try {
this.started = true;
pool.execute(new QueryingAction(pcqs));
for(;;){
MyObj p = bq.poll(timeout, TimeUnit.MINUTES); // poll from a blocking queue
if(p != null){
if (p.getId().equals("0"))
break;
pool.submit(new AnalysisAction(ds, p, analyzedObjs));
}else
drc.log("Timed out while waiting...");
}
} catch (Exception ex) {
ex.printStackTrace();
}finally{
drc.log("--DEBUG: Termination criteria found, shutdown initiated..");
pool.shutdown();
int mins = 2;
int nCores = poolSize -1 ;
long totalTasks = pool.getTaskCount(),
compTasks = pool.getCompletedTaskCount(),
tasksRemaining = totalTasks - compTasks,
timeout = mins * tasksRemaining / nCores;
drc.log( "--DEBUG: Shutdown commenced, thread pool will terminate once all objects are processed, " +
"or will timeout in : " + timeout + " minutes... \n" + compTasks + " of " + (totalTasks -1) +
" objects have been analyzed so far, " + "mean process time is: " +
drc.getMeanProcTimeAsString() + " milliseconds.");
pool.awaitTermination(timeout, TimeUnit.MINUTES);
}
Upvotes: 0
Reputation: 5776
executorService.invokeAll
should return only when all tasks are finished. As well as future.get()
Are you sure, that call to executorService.invokeAll(todo);
ever returns and not blocks forever waiting for tasks to complete?
Upvotes: 0
Reputation: 533442
When you perform a shutdownNow() it interrupts all the threads in the pool. However, if your code ignores interrupts, they won't stop. You need to make your tasks honour interrupts with tests like
while(!Thread.currentThread.isInterrupted()) {
}
or
Thread.sleep(0);
Upvotes: 3