Daniel Gerber
Daniel Gerber

Reputation: 3470

ExecutorService never stops without Exceptions

I adopted a the concurrency strategy from this post. However mine looks like this:

ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_CREATE_KNOWLEDGE_THREADS);
List<Callable<Collection<Triple>>> todo = new ArrayList<Callable<Collection<Triple>>>(this.patternMappingList.size());
for (PatternMapping mapping : this.patternMappingList ) {
    todo.add(new CreateKnowledgeCallable(mapping, i++));
}
try {

    List<Future<Collection<Triple>>> answers = executorService.invokeAll(todo);
    for (Future<Collection<Triple>> future : answers) {

        Collection<Triple> triples = future.get();
        this.writeNTriplesFile(triples);
    }    
}
catch (InterruptedException e) { ... }
catch (ExecutionException e) { ... }

executorService.shutdown();
executorService.shutdownNow();

But the ExecutorService never shuts down. I tried to debug how many of the CreateKnowledgeCallable are finished, but this number seems to vary (after no new threads/callables are executed but the service keeps running). I am sure a logged and printed every possible exception but I can't see one happening. It also seems that after a while nothing happens anymore except that NUMBER_OF_CREATE_KNOWLEDGE_THREADS cpus are spinning at 100% forever. What am I doing wrong? If you need to more specific infos I would be happy to provide them for you!

Kind regards, Daniel

Upvotes: 2

Views: 1066

Answers (4)

Daniel Gerber
Daniel Gerber

Reputation: 3470

Everyone with this sort of problems should try to implement the same algorithm without concurrency. With the help of this method, I found that a component has thrown a runtime exception which was swallowed.

Upvotes: 0

posdef
posdef

Reputation: 6532

are you sure that you submitted tasks actually finish? If you check the API for shutdownNow() and shutdown() you'll see that they do not guarantee termination.

Have you tried using a call to awaitTermination(long timeout, TimeUnit unit) with a reasonable amount of time as timeout parameter? (edit: "reasonable amount of time" depends of course on the mean process time of your tasks as well as the number of tasks executing at the time you call for termination)

Edit2: I hope the following example from my own code might help you out (note that it probably isn't the optimal, or most gracious, way to solve this problem)

try {

        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            MyObj p = bq.poll(timeout, TimeUnit.MINUTES); // poll from a blocking queue

            if(p != null){
                if (p.getId().equals("0"))
                    break;

                pool.submit(new AnalysisAction(ds, p, analyzedObjs));
            }else 
                drc.log("Timed out while waiting...");

        }

      } catch (Exception ex) {
          ex.printStackTrace();

      }finally{
          drc.log("--DEBUG: Termination criteria found, shutdown initiated..");
          pool.shutdown();

          int mins = 2;
          int nCores = poolSize -1 ;
          long  totalTasks = pool.getTaskCount(), 
                compTasks = pool.getCompletedTaskCount(),
                tasksRemaining = totalTasks - compTasks,
                timeout = mins * tasksRemaining / nCores;

          drc.log(  "--DEBUG: Shutdown commenced, thread pool will terminate once all objects are processed, " +
                    "or will timeout in : " + timeout + " minutes... \n" + compTasks + " of " +  (totalTasks -1) + 
                    " objects have been analyzed so far, " + "mean process time is: " +
                    drc.getMeanProcTimeAsString() + " milliseconds.");

          pool.awaitTermination(timeout, TimeUnit.MINUTES);
      }

Upvotes: 0

Nikem
Nikem

Reputation: 5776

executorService.invokeAll

should return only when all tasks are finished. As well as future.get() Are you sure, that call to executorService.invokeAll(todo); ever returns and not blocks forever waiting for tasks to complete?

Upvotes: 0

Peter Lawrey
Peter Lawrey

Reputation: 533442

When you perform a shutdownNow() it interrupts all the threads in the pool. However, if your code ignores interrupts, they won't stop. You need to make your tasks honour interrupts with tests like

while(!Thread.currentThread.isInterrupted()) {

}

or

Thread.sleep(0);

Upvotes: 3

Related Questions