Reputation: 8553
I am trying to create a stub for calling multiple web services simultaneously, but I am getting an error when I am handling the CancellationException. Here is the main method
ExecutorService pool= Executors.newFixedThreadPool(7);
List<Future<Long>> futureList = new ArrayList<Future<Long>>();
Set<CallableDemo> callList = new HashSet<CallableDemo>();
callList.add(new CallableDemo(0L));
callList.add(new CallableDemo(10L));
callList.add(new CallableDemo(20L));
callList.add(new CallableDemo(30L));
callList.add(new CallableDemo(40L));
callList.add(new CallableDemo(50L));
callList.add(new CallableDemo(-600L));
callList.add(new CallableDemo(-700L));
callList.add(new CallableDemo(-800L));
callList.add(new CallableDemo(-900L));
futureList = pool.invokeAll(callList, 15L, TimeUnit.SECONDS);
for(Future<Long> fut : futureList){
try {
System.out.println(new Date()+ "::"+fut.get());
} catch (InterruptedException e) {
System.out.println("Done :)");
e.printStackTrace();
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
System.out.println("Done :)");
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
executor.shutdown();
And here is CallableDemo,
import java.util.concurrent.Callable;
public class CallableDemo implements Callable<Long>
{
private Long count = 0L;
public CallableDemo(Long i)
{
this.count = i;
}
public Long call() throws Exception
{
Long i;
for( i = this.count; i < 100L; i++)
{
try { Thread.sleep(100); }
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return i;
}
System.out.println(Thread.currentThread().getName() + " - " + i);
}
return i;
}
}
Because I have specified a timeout of 15 seconds, here is what I get as the output:
pool-2-thread-1 - -764
pool-2-thread-6 - -744
pool-2-thread-2 - 97
pool-2-thread-4 - -563
pool-2-thread-1 - -763
pool-2-thread-6 - -743
pool-2-thread-5 - -463
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:220)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at CallableTest.main(CallableTest.java:44)
As you can see, the thread 3 has finished. What I am trying to do is, at the end of the timeout period, if any of the threads have not finished by then, I want to cancel those threads and put an error status, but not throw the exception all the way back. How do I achieve this?
Also, I want to show the results of all the threads that executed and those that didn't.
For some reason, the answers are getting deleted. Please keep them there, it might help others who arent exactly looking for this.
Upvotes: 1
Views: 1742
Reputation: 8553
I finally figured out how to check if the thread completed or not, and how to handle cancelled threads. Below is the code.
public class CallableTest
{
public static void main(String args[]) throws Exception
{
ExecutorService executor = Executors.newFixedThreadPool(10);
ExecutorService pool= Executors.newFixedThreadPool(10);
List<Future<Long>> futureList = new ArrayList<Future<Long>>();
Set<CallableDemo> callList = new HashSet<CallableDemo>();
//submit Callable tasks to be executed by thread pool
//<Long> future = executor.submit(callable);
//add Future to the list, we can get return value using Future
//list.add(future);
callList.add(new CallableDemo(0L));
callList.add(new CallableDemo(10L));
callList.add(new CallableDemo(20L));
callList.add(new CallableDemo(30L));
callList.add(new CallableDemo(40L));
callList.add(new CallableDemo(50L));
callList.add(new CallableDemo(-600L));
callList.add(new CallableDemo(-700L));
callList.add(new CallableDemo(-800L));
callList.add(new CallableDemo(-900L));
futureList = pool.invokeAll(callList, 15L, TimeUnit.SECONDS);
for(Future<Long> fut : futureList){
try {
//print the return value of Future, notice the output delay in console
// because Future.get() waits for task to get completed
if( !fut.isCancelled())
System.out.println(new Date()+ "::"+fut.get());
} catch (InterruptedException e) {
//e.printStackTrace();
System.out.println("Done :)");
//Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
//e.printStackTrace();
System.out.println("Done :)");
//Thread.currentThread().interrupt();
}
}
//shut down the executor service now
executor.shutdown();
System.out.println("Done :)");
}
}
I use the Future method isCancelled(), or you can use isDone() as well, http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html
public class CallableDemo implements Callable<Long>
{
private Long count = 0L;
public CallableDemo(Long i)
{
this.count = i;
}
public Long call() throws InterruptedException
{
Long i;
for( i = this.count; i < 100L; i++)
{
try { Thread.sleep(100); }
catch (InterruptedException e) {
System.out.println("Interruped " + Thread.currentThread().getName());
//Thread.currentThread().interrupt();
return i;
}
//System.out.println(Thread.currentThread().getName() + " - " + i);
}
System.out.println("Finished " + Thread.currentThread().getName());
return i;
}
}
Upvotes: 1
Reputation: 1474
ThreadPools implementing ExecutorService
s do not provide access to their worker threads. Therefore, use a custom ThreadFactory
. Make it store its threads in a collection so you can later interrupt them. Make sure your job Runnable
s have proper interrupt handling to set the error status. Provide a callable return type that can express both the error status and an actual return value.
Upvotes: 0