Reputation: 847
I want to implement a threadpool that tasks can be executed for certain times by overriding afterExecute
hook. Can I just submit the argument Runnable r
again?
Here is my initial implementation.
public class RetriableThreadPool extends ThreadPoolExecutor {
static final int MAXRETRYTIMES = 5;
int retryTimes = 0;
public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
retryTimes = 0;
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (retryTimes < MAXRETRYTIMES) {
retryTimes++;
super.submit(r);
}
}
}
In this initial implementation, I just allow one task submitted.
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
public static void main(String[] args) {
RetriableThreadPool retriableThreadPool = new RetriableThreadPool(10, 10, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
retriableThreadPool.execute(new Runnable() {
int num = 0;
@Override
public void run() {
// TODO Auto-generated method stub
num = num + 123;
System.out.println(num);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// retriableThreadPool.shutdown();
}
}
In this example, I got weird output:
123
246
If the runnable can be resubmitted, I think I should get 5 outputs. If this cannot be resubmitted. Only 123 should be the result. I don't understand the reason of this output.
I modified the code thanks to nogard
public class RetriableThreadPool extends ThreadPoolExecutor {
static final int MAXRETRYTIMES = 5;
int retryTimes = 0;
public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
retryTimes = 0;
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (retryTimes < MAXRETRYTIMES) {
retryTimes++;
super.execute(r);
}
}
}
I have 3 other questions:
submit
just like afterExecute
for execute
Upvotes: 1
Views: 429
Reputation: 9716
I think the reason of such behavior is that you submit task in the afterExecute method instead of execute, and submit will not trigger afterExecute
callback again. That's why you see only 2 lines in the output: first one is from original execute, and the second one is from submit.
Moreover, you never increment retry counter, your task will be always resubmitted
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
++ retryTimes;
if (retryTimes < MAXRETRYTIMES) {
super.execute(r);
}
}
Update for your 3 questions:
There are multiple options:
For the hook, I would implement in with Decorator pattern: something like this:
public class YourExecutor {
@Override
public void submit(Runnable task) {
return super.submit(new TaskDecorator(task));
}
protected void onCompletedTask(Runnable task) {
// callback
}
private class TaskDecorator implements Runnable {
private final Runnable delegate;
public TaskDecorator(Runnable delegate) {
this.delegate = delegate;
}
@Override
public void run() {
this.delegate.run();
onCompletedTask(delegate);
}
}
Upvotes: 2