Achu S
Achu S

Reputation: 171

How to stop all the running threads, if one of those throws an Exception?

In one of my application I'm using the ExecutorService class to create a fixed thread pool and CountDownLatch to wait for the threads to complete. This is working fine if the process didn't throw any exception . If there is an exception occurred in any of the threads, I need to stop all the running thread and report the error to the main thread. Can any one please help me to solve this?

This is the sample code I'm using for executing multiple threads.

    private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
        try      
       {
        CountDownLatch latch = new CountDownLatch(noOfThreads);
        for(int i=0; i< noOfThreads; i++){
         executor.submit(new ThreadExecutor(latch));
        }
        latch.await();           
       }
       catch(Exception e)
       {
        e.printStackTrace();
       }
       finally
       {
        executor.shutDown();
       }
   }

This is the Executor Class

     public class ThreadExecutor implements Callable<String> {
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {
        doMyTask(); // process logic goes here!
        this.latch.countDown();
        return "Success";
    }

=============================================================================

Thank you all :)

I have corrected my class as given below and that is working now.

private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
       ArrayList<Future<Object>> futureList = new ArrayList<Future<Object>>(noOfThreads );
    try
    {
        userContext = BSF.getMyContext();
        CountDownLatch latch = new CountDownLatch(noOfComponentsToImport);

        for(ImportContent artifact:artifactList){
            futureList.add(executor.submit(new ThreadExecutor(latch)));
        }

        latch.await();

        for(Future<Object> future : futureList)
        {
                  try
                  {
                      future.get();                 
                   }
                   catch(ExecutionException e)
                   {   //handle it               
                    }
        }           

    }
    catch (Exception e) {
       //handle it
    }
    finally
    {
        executor.shutdown();      

        try
        {
            executor.awaitTermination(90000, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e)
        {
           //handle it
        }
    }
   }

Executor Class :

public class ThreadExecutor implements Callable<String> {
        private static volatile boolean isAnyError;
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {

      try{
            if(!isAnyError)
            { 
               doMyTask(); // process logic goes here!
            }
     }
     catch(Exception e)
     {
        isAnyError = true ;
        throw e;
      }
      finally
      {
        this.latch.countDown();
       }
        return "Success";
    }

Upvotes: 15

Views: 11183

Answers (4)

artbristol
artbristol

Reputation: 32397

Use an ExecutorCompletionService, complete with an ExecutorService that outlives the duration of the tasks (i.e. it doesn't get shut down afterwards):

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Threader {

    static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        new Threader().start();
        service.shutdown();
    }

    private void start() {
        CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
                service);
        /*
         * Holds all the futures for the submitted tasks
         */
        List<Future<Void>> results = new ArrayList<Future<Void>>();

        for (int i = 0; i < 3; i++) {
            final int callableNumber = i;

            results.add(completionService.submit(new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
                     System.out.println("Task " + callableNumber
                             + " in progress");
                     try {
                         Thread.sleep(callableNumber * 1000);
                     } catch (InterruptedException ex) {
                         System.out.println("Task " + callableNumber
                                 + " cancelled");
                         return null;
                     }
                     if (callableNumber == 1) {
                         throw new Exception("Wrong answer for task "
                                 + callableNumber);
                     }
                     System.out.println("Task " + callableNumber + " complete");
                     return null;
                 }
             }));
        }

        boolean complete = false;
        while (!complete) {
            complete = true;
            Iterator<Future<Void>> futuresIt = results.iterator();
            while (futuresIt.hasNext()) {
                if (futuresIt.next().isDone()) {
                    futuresIt.remove();
                } else {
                    complete = false;
                }
            }

            if (!results.isEmpty()) {
                try {
                    /*
                     * Examine results of next completed task
                     */
                    completionService.take().get();
                } catch (InterruptedException e) {
                    /*
                     * Give up - interrupted.
                     */
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
                    /*
                     * The task threw an exception
                     */
                    System.out.println("Execution exception " + e.getMessage());
                    complete = true;
                    for (Future<Void> future : results) {
                        if (!future.isDone()) {
                            System.out.println("Cancelling " + future);
                            future.cancel(true);
                        }
                    }
                }
            }
        }

    }
}

Output is something like:

Task 0 in progress
Task 2 in progress
Task 1 in progress
Task 0 complete
Execution exception java.lang.Exception: Wrong answer for task 1
Cancelling java.util.concurrent.FutureTask@a59698
Task 2 cancelled

where Task 2 got cancelled due to the failure of Task 1.

Upvotes: 5

Cratylus
Cratylus

Reputation: 54074

I think you need to restructure your code. Take a look into ExecutorService#invokeAny

Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do. Upon normal or exceptional return, tasks that have not completed are cancelled. The results of this method are undefined if the given collection is modified while this operation is in progress.

This seems to be the behavior you need. And you don't need a CountDownLatch as main will block in the invokeAny

Upvotes: 1

yegor256
yegor256

Reputation: 105043

I think that you will need one more thread, call it a "Watcher", which will check the value of an AtomicBoolean for true. Once it's set - you will shutdown the main execution service. Keep in mind that shutdown mechanism won't guarantee an immediate stop of all threads. Read this, for example: Graceful shutdown of threads and executor

Upvotes: 1

Marko Topolnik
Marko Topolnik

Reputation: 200138

I strongly suggest you use a robust mechanism to count down the latch. Use an all-encompassing try-finally { latch.countDown(); } Detect errors in threads using a separate mechanism.

Upvotes: 4

Related Questions