chenatu
chenatu

Reputation: 847

Retriable threadpool by overriding afterExecute(Runnable r, Throwable t)

I want to implement a threadpool that tasks can be executed for certain times by overriding afterExecute hook. Can I just submit the argument Runnable r again?

Here is my initial implementation.

public class RetriableThreadPool extends ThreadPoolExecutor {

  static final int MAXRETRYTIMES = 5;

  int retryTimes = 0;

  public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    retryTimes = 0;
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (retryTimes < MAXRETRYTIMES) {
      retryTimes++;
      super.submit(r);
    }
  }

}

In this initial implementation, I just allow one task submitted.

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

  public static void main(String[] args) {
    RetriableThreadPool retriableThreadPool = new RetriableThreadPool(10, 10, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    retriableThreadPool.execute(new Runnable() {
      int num = 0;

      @Override
      public void run() {
        // TODO Auto-generated method stub
        num = num + 123;
        System.out.println(num);
      }

    });
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    // retriableThreadPool.shutdown();
  }
}

In this example, I got weird output:

123
246

If the runnable can be resubmitted, I think I should get 5 outputs. If this cannot be resubmitted. Only 123 should be the result. I don't understand the reason of this output.


I modified the code thanks to nogard

public class RetriableThreadPool extends ThreadPoolExecutor {

  static final int MAXRETRYTIMES = 5;

  int retryTimes = 0;

  public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    retryTimes = 0;
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (retryTimes < MAXRETRYTIMES) {
      retryTimes++;
      super.execute(r);
    }
  }
}

I have 3 other questions:

  1. How to retry the runnable with the original state. In this case, I expected the results would be 5 times of 123
  2. How to add hooks for method submit just like afterExecute for execute
  3. Is there a good implementation of retriable threadpool already? I want to runnable is retried when exceptions are thrown or callable returns certain results.

Upvotes: 1

Views: 429

Answers (1)

nogard
nogard

Reputation: 9716

I think the reason of such behavior is that you submit task in the afterExecute method instead of execute, and submit will not trigger afterExecute callback again. That's why you see only 2 lines in the output: first one is from original execute, and the second one is from submit.

Moreover, you never increment retry counter, your task will be always resubmitted

@Override
protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    ++ retryTimes;
    if (retryTimes < MAXRETRYTIMES) {
        super.execute(r);
    }
}

Update for your 3 questions:

  1. There are multiple options:

    • don't change the state inside Runnable (don't assign to num)
    • create new instance of Runnable (or copy instance)
    • reset the state of Runnable
  2. For the hook, I would implement in with Decorator pattern: something like this:

    public class YourExecutor {
    @Override
    public void submit(Runnable task) {
        return super.submit(new TaskDecorator(task));
    }
    
    protected void onCompletedTask(Runnable task) {
        // callback
    }
    
    private class TaskDecorator implements Runnable {
        private final Runnable delegate;
    
        public TaskDecorator(Runnable delegate) {
            this.delegate = delegate;
        }
    
        @Override
        public void run() {
            this.delegate.run();
            onCompletedTask(delegate);
        }
    }
    

Upvotes: 2

Related Questions