roymustang86
roymustang86

Reputation: 8553

java ExecutorService how to handle timeouts

I am trying to create a stub for calling multiple web services simultaneously, but I am getting an error when I am handling the CancellationException. Here is the main method

    ExecutorService pool= Executors.newFixedThreadPool(7);
    List<Future<Long>> futureList = new ArrayList<Future<Long>>();
    Set<CallableDemo> callList = new HashSet<CallableDemo>();

    callList.add(new CallableDemo(0L));
    callList.add(new CallableDemo(10L));
    callList.add(new CallableDemo(20L));
    callList.add(new CallableDemo(30L));
    callList.add(new CallableDemo(40L));
    callList.add(new CallableDemo(50L));
    callList.add(new CallableDemo(-600L));
    callList.add(new CallableDemo(-700L));
    callList.add(new CallableDemo(-800L));
    callList.add(new CallableDemo(-900L));

    futureList = pool.invokeAll(callList, 15L, TimeUnit.SECONDS);

    for(Future<Long> fut : futureList){
    try {
            System.out.println(new Date()+ "::"+fut.get());
    } catch (InterruptedException e) {
         System.out.println("Done :)");
         e.printStackTrace();
        Thread.currentThread().interrupt();
    }
    catch (ExecutionException e) {
         System.out.println("Done :)");
        e.printStackTrace();
        Thread.currentThread().interrupt();
    }
    }
    executor.shutdown();

And here is CallableDemo,

 import java.util.concurrent.Callable;
 public class CallableDemo implements Callable<Long>
{
private Long count = 0L;

public CallableDemo(Long i)
{
    this.count = i;
}

public Long call() throws Exception
{
    Long i;
    for( i = this.count; i < 100L; i++)
    {
        try { Thread.sleep(100); }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return i;
        }
        System.out.println(Thread.currentThread().getName() + " - " + i);
    }
    return i;
}
}

Because I have specified a timeout of 15 seconds, here is what I get as the output:

pool-2-thread-1 - -764
pool-2-thread-6 - -744
pool-2-thread-2 - 97
pool-2-thread-4 - -563
pool-2-thread-1 - -763
pool-2-thread-6 - -743
pool-2-thread-5 - -463
Exception in thread "main" java.util.concurrent.CancellationException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:220)
    at java.util.concurrent.FutureTask.get(FutureTask.java:83)
    at CallableTest.main(CallableTest.java:44)

As you can see, the thread 3 has finished. What I am trying to do is, at the end of the timeout period, if any of the threads have not finished by then, I want to cancel those threads and put an error status, but not throw the exception all the way back. How do I achieve this?

Also, I want to show the results of all the threads that executed and those that didn't.

For some reason, the answers are getting deleted. Please keep them there, it might help others who arent exactly looking for this.

Upvotes: 1

Views: 1742

Answers (2)

roymustang86
roymustang86

Reputation: 8553

I finally figured out how to check if the thread completed or not, and how to handle cancelled threads. Below is the code.

public class CallableTest
{
public static void main(String args[]) throws Exception
{
    ExecutorService executor = Executors.newFixedThreadPool(10);
    ExecutorService pool= Executors.newFixedThreadPool(10);
    List<Future<Long>> futureList = new ArrayList<Future<Long>>();
    Set<CallableDemo> callList = new HashSet<CallableDemo>();

         //submit Callable tasks to be executed by thread pool
         //<Long> future = executor.submit(callable);
         //add Future to the list, we can get return value using Future
         //list.add(future);

         callList.add(new CallableDemo(0L));
         callList.add(new CallableDemo(10L));
         callList.add(new CallableDemo(20L));
         callList.add(new CallableDemo(30L));
         callList.add(new CallableDemo(40L));
         callList.add(new CallableDemo(50L));
         callList.add(new CallableDemo(-600L));
         callList.add(new CallableDemo(-700L));
         callList.add(new CallableDemo(-800L));
         callList.add(new CallableDemo(-900L));

        futureList = pool.invokeAll(callList, 15L, TimeUnit.SECONDS);

        for(Future<Long> fut : futureList){
                try {
                    //print the return value of Future, notice the output delay in console
                    // because Future.get() waits for task to get completed
                    if( !fut.isCancelled())
                        System.out.println(new Date()+ "::"+fut.get());
                } catch (InterruptedException e) {
                    //e.printStackTrace();
                    System.out.println("Done :)");
                    //Thread.currentThread().interrupt();
                }
                catch (ExecutionException e) {
                    //e.printStackTrace();
                    System.out.println("Done :)");
                    //Thread.currentThread().interrupt();
                }
            }
            //shut down the executor service now
            executor.shutdown();
            System.out.println("Done :)");
}
}

I use the Future method isCancelled(), or you can use isDone() as well, http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html

public class CallableDemo implements Callable<Long>
{
 private Long count = 0L;

public CallableDemo(Long i)
{
    this.count = i;
}

public Long call() throws InterruptedException
{
    Long i;
    for( i = this.count; i < 100L; i++)
    {
        try { Thread.sleep(100); }
            catch (InterruptedException e) {
                System.out.println("Interruped " + Thread.currentThread().getName());
                //Thread.currentThread().interrupt();
            return i;
        }
        //System.out.println(Thread.currentThread().getName() + " - " + i);
    }
    System.out.println("Finished " + Thread.currentThread().getName());
    return i;
}
}

Upvotes: 1

Ralf H
Ralf H

Reputation: 1474

ThreadPools implementing ExecutorServices do not provide access to their worker threads. Therefore, use a custom ThreadFactory. Make it store its threads in a collection so you can later interrupt them. Make sure your job Runnables have proper interrupt handling to set the error status. Provide a callable return type that can express both the error status and an actual return value.

Upvotes: 0

Related Questions