Reputation: 30133
I'm looking for an ExecutorService implementation that can be provided with a timeout. Tasks that are submitted to the ExecutorService are interrupted if they take longer than the timeout to run. Implementing such a beast isn't such a difficult task, but I'm wondering if anybody knows of an existing implementation.
Here's what I came up with based on some of the discussion below. Any comments?
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
public void shutdown() {
public List<Runnable> shutdownNow() {
return super.shutdownNow();
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
public void run() {
Upvotes: 121
Views: 139066
Reputation: 11977
Unfortunately the solution is flawed. There is a sort of bug with ScheduledThreadPoolExecutor
, also reported in this question: cancelling a submitted task does not fully release the memory resources associated with the task; the resources are released only when the task expires.
If you therefore create a TimeoutThreadPoolExecutor
with a fairly long expiration time (a typical usage), and submit tasks fast enough, you end up filling the memory - even though the tasks actually completed successfully.
You can see the problem with the following (very crude) test program:
public static void main(String[] args) throws InterruptedException {
ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
//ExecutorService service = Executors.newFixedThreadPool(1);
try {
final AtomicInteger counter = new AtomicInteger();
for (long i = 0; i < 10000000; i++) {
service.submit(new Runnable() {
public void run() {
if (i % 10000 == 0) {
System.out.println(i + "/" + counter.get());
while (i > counter.get()) {
} finally {
The program exhausts the available memory, although it waits for the spawned Runnable
s to complete.
I though about this for a while, but unfortunately I could not come up with a good solution.
I discovered this issue was reported as JDK bug 6602600, and appears to have been fixed in Java 7.
Upvotes: 10
Reputation: 4285
What about this?
final ExecutorService myExecutorService = ...;
// create CompletableFuture to get result/exception from runnable in specified timeout
final CompletableFuture<Object> timeoutFuture = new CompletableFuture<>();
// submit runnable and obtain cancellable Future from executor
final Future<?> cancellableFuture = myExecutorService.submit(() -> {
try {
Object result = myMethod(...);
} catch (Exception e) {
// block the calling thread until "myMethod" will finish or time out (1 second)
try {
Object result = timeoutFuture.get(1000, TimeUnit.MILLISECONDS);
// "myMethod" completed normally
} catch (TimeoutException te) {
// "myMethod" timed out
// ...
} catch (ExecutionException ee) {
// "myMethod" completed exceptionally - get cause
final Throwable cause = ee.getCause();
// ...
} catch (InterruptedException ie) {
// future interrupted
// ...
} finally {
// timeoutFuture.cancel(true); // CompletableFuture does not support cancellation
cancellableFuture.cancel(true); // Future supports cancellation
Upvotes: 1
Reputation: 156
You can use this implementation that ExecutorService provides
invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
executor.invokeAll(Arrays.asList(task), 2 , TimeUnit.SECONDS);
However, in my case, I could not as Arrays.asList took extra 20ms.
Upvotes: 0
Reputation: 2493
Using John W answer I created an implementation that correctly begin the timeout when the task starts its execution. I even write a unit test for it :)
However, it does not suit my needs since some IO operations do not interrupt when Future.cancel()
is called (ie when Thread.interrupt()
is called).
Some examples of IO operation that may not be interrupted when Thread.interrupt()
is called are Socket.connect
(and I suspect most of IO operation implemented in
). All IO operations in java.nio
should be interruptible when Thread.interrupt()
is called. For example, that is the case for
Anyway if anyone is interested, I created a gist for a thread pool executor that allows tasks to timeout (if they are using interruptible operations...):
Upvotes: 1
Reputation: 51
check if this works for you,
public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
Map<K,V> context, Task<T,S,K,V> someTask){
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
collection.forEach(value -> {
callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();
int count = 0;
while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
Collection<ResponseObject<T>> responseCollection = new ArrayList<>();
Future<T> future = futures.poll();
ResponseObject<T> responseObject = null;
try {
T response = future.get(timeToCompleteEachTask, timeUnit);
responseObject = ResponseObject.<T>builder().data(response).build();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} catch (TimeoutException e) {
} finally {
if (Objects.nonNull(responseObject)) {
futures.remove(future);//remove this
Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
Future<T> f = threadPoolExecutor.submit(callable);
return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
return callableLinkedBlockingQueue.poll();
return null;
you can restrict the no of thread uses from scheduler as well as put timeout on the task.
Upvotes: 0
Reputation: 40256
You can use a ScheduledExecutorService for this. First you would submit it only once to begin immediately and retain the future that is created. After that you can submit a new task that would cancel the retained future after some period of time.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
final Future handler = executor.submit(new Callable(){ ... });
executor.schedule(new Runnable(){
public void run(){
}, 10000, TimeUnit.MILLISECONDS);
This will execute your handler (main functionality to be interrupted) for 10 seconds, then will cancel (i.e. interrupt) that specific task.
Upvotes: 107
Reputation: 1964
After ton of time to survey,
Finally, I use invokeAll
method of ExecutorService
to solve this problem.
That will strictly interrupt the task while task running.
Here is example
ExecutorService executorService = Executors.newCachedThreadPool();
try {
List<Callable<Object>> callables = new ArrayList<>();
// Add your long time task (callable)
callables.add(new VaryLongTimeTask());
// Assign tasks for specific execution timeout (e.g. 2 sec)
List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
for (Future<Object> future : futures) {
// Getting result
} catch (InterruptedException e) {
The pro is you can also submit ListenableFuture
at the same ExecutorService
Just slightly change the first line of code.
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
is the Listening feature of ExecutorService
at google guava project ( )
Upvotes: 4
Reputation: 1
What about this alternative idea :
Small sample is here :
public class AlternativeExecutorService
private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;
public AlternativeExecutorService()
scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
public void pushTask(OwnTask task)
ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable
futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
public void shutdownInternalScheduledExecutor()
long getCurrentMillisecondsTime()
return Calendar.getInstance().get(Calendar.MILLISECOND);
class ListenableFutureTask
private final ListenableFuture<Void> future;
private final OwnTask task;
private final long milliSecEndTime;
private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
this.future = future;
this.task = task;
this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
ListenableFuture<Void> getFuture()
return future;
OwnTask getTask()
return task;
long getMilliSecEndTime()
return milliSecEndTime;
class TimeoutManagerJob implements Runnable
CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
return futureQueue;
public void run()
long currentMileSecValue = getCurrentMillisecondsTime();
for (ListenableFutureTask futureTask : futureQueue)
consumeFuture(futureTask, currentMileSecValue);
private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
ListenableFuture<Void> future = futureTask.getFuture();
boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
if (isTimeout)
if (!future.isDone())
class OwnTask implements Callable<Void>
private long timeoutDuration;
private TimeUnit timeUnit;
OwnTask(long timeoutDuration, TimeUnit timeUnit)
this.timeoutDuration = timeoutDuration;
this.timeUnit = timeUnit;
public Void call() throws Exception
// do logic
return null;
public long getTimeoutDuration()
return timeoutDuration;
public TimeUnit getTimeUnit()
return timeUnit;
Upvotes: 0
Reputation: 9
It seems problem is not in JDK bug 6602600 ( it was solved at 2010-05-22), but in incorrect call of sleep(10) in circle. Addition note, that the main Thread must give directly CHANCE to other threads to realize thier tasks by invoke SLEEP(0) in EVERY branch of outer circle. It is better, I think, to use Thread.yield() instead of Thread.sleep(0)
The result corrected part of previous problem code is such like this:
if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
// while (i > counter.get()) {
// Thread.sleep(10);
// }
It works correctly with amount of outer counter up to 150 000 000 tested circles.
Upvotes: 1
Reputation: 9816
How about using the ExecutorService.shutDownNow()
method as described in It seems to be the simplest solution.
Upvotes: 1
Reputation: 75456
Wrap the task in FutureTask and you can specify timeout for the FutureTask. Look at the example in my answer to this question,
Upvotes: 6