Reputation: 13190
I'm my code I submit some tasks to an ExecutorService and then wait for them to complete using shutdown() and awaitTermination(). But if any one tasks takes longer than a certain period to complete I want it cancelled without affecting other tasks. I use code amended code from ExecutorService that interrupts tasks after a timeout as follows:
package com.jthink.jaikoz.memory;
import com.jthink.jaikoz.MainWindow;
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private boolean isShutdown = false;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
//Map Task to the Timeout Task that could be used to interrupt it
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public long getTimeout()
{
return timeout;
}
public TimeUnit getTimeoutUnit()
{
return timeoutUnit;
}
public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
{
super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
isShutdown = true;
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
//Schedule a task to interrupt the thread that is running the task after time timeout
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
//Add Mapping
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//Remove mapping and cancel timeout task
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
if (isShutdown)
{
if(getQueue().isEmpty())
{
//Queue is empty so all tasks either finished or currently running
MainWindow.logger.severe("---Thread Pool Queue is Empty");
//timeoutExecutor.shutdownNow();
}
}
}
/**
* Interrupt the thread
*
*/
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
MainWindow.logger.severe("Cancelling task because taking too long");
thread.interrupt();
}
}
}
and a testcase for when tasks have time to complete and when they don't both work
package com.jthink.jaikoz;
import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by Paul on 08/12/2014.
*/
public class TestThreadPool extends TestCase
{
public void testThreadPoolTasksComplete() throws Exception
{
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++)
{
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Done");
return null;
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
}
public void testThreadPoolTasksCancelled() throws Exception
{
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++)
{
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Done");
return null;
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
}
}
and in my code appear to work:
private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
throws JaikozException
{
if (stopTask)
{
MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
return false;
}
TimeoutThreadPoolExecutor es = getExecutorService();
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
for(MatchKey matchKey:matchKeyToSongs.keySet())
{
List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
}
es.shutdown();
try
{
es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
}
catch(InterruptedException ie)
{
MainWindow.logger.warning(this.getClass() + " has been interrupted");
return false;
}
return true;
}
however for one customer even though
---Thread Pool Queue is Empty
is output awaitTermination() doesn't return,only eventually returning when user cancels task two hours later - full log extract here
14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted
So how can it be that awaiterTermination() is not returning even though the logs show queue is empty and therefore shutdown() has been called on both the Executor itself and the embedded timeoutExecutor ?
I have had a few thoughts about this myself but dont know the answer.
Firstly why it is actually neccessary to shutdown the TimeOutExecutor for awaitTermination() to return anyway. In my subclass awaitTermination() is not overridden so if all tasks have completed what does it matter if the TiumeOutExecutor (that awaitTermination() knows nothing about is shutdown or not)
Secondly why does ---Thread Pool Queue is Empty sometimes get output more than once
Upvotes: 1
Views: 1459
Reputation: 16833
I made a custom modification in TimeoutThreadPoolExecutor
and it's working fine.
public static class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
private final long timeout;
private final TimeUnit timeoutUnit;
private boolean isShutdown = false;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
isShutdown = true;
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
if (isShutdown) timeoutExecutor.shutdown();
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
System.out.println("Cancelled");
}
}
}
Case 1 : No timeout
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
6, TimeUnit.SECONDS);
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Done");
return null;
}
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
It prints :
Task done
Program done
Case 2 : Timeout
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
3, TimeUnit.SECONDS);
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Task done");
return null;
}
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
It prints :
Cancelled
Program done
Upvotes: 2