TheMonkWhoSoldHisCode
TheMonkWhoSoldHisCode

Reputation: 2332

Time bound tasks using ExectutorService

I have long running tasks submitted to a ExecutorService. The task could run for quite some time. Meanwhile, new tasks are submitted to the internal blocking queue.

On completion of the submitted task, a notification is sent back to release tasks from the queue for execution. However, sometimes, the notification is not triggered, either because of programming errors or network issues. In such scenarios, there is a possibility that my task queue will grow quiet large and I might encounter a scenario where the tasks may lie forever in the queue.

To overcome this, I am thinking of writing a thread which will check periodically the time for which the task is lying idle in the queue. If the task is lying in the queue for say 15 minutes, I will assume that the earlier submitted task encountered a error and hence did not return. I will then evict the task from the queue and allow it to get executed.

Is there any existing mechanism to handle this or I will have to write this custom logic?

Note:
The reason I am not preferring ScheduledExecutor service is because not all tasks are to be executed periodically. Only failure scenarios should execute after certain delay.

EDIT Brief overview of the architecture The solution that I am designing should support many concurrent static file downloads. Typically there could be thousands of download requests. The download requests are triggered from a UI based application. That way I know when the requests would be triggered. Taking advantage of this approach , I intend to throttle the download requests.

What happens when a user creates a request for say 300 download requests?

  1. Application worker threads create 300 tasks for download
  2. 100 tasks are submitted.I have a defined MAX HTTP thread pool size of say 100. This means I can support maximum of 100 synchronous parallel downloads (servlet 2.5) The task in turn asks the remote HTTP clients to perform a HTTP get. Note that HTTP threads are not in play yet
  3. Remaining 200 requests are queued.
  4. The HTTP client perfomr a HTTP Get. The HTTP threads now streams the response in a BLOCKING fashion.
  5. Once I receive a 200 OK, I create a notification to inform that one of the client has completed the download.
  6. The throttle will now release/submit one of the task from the previously queued 200 requests.

In scenarios where I am able to receive responses (HTTP 200/ HTTP 500 etc) the throttling mechanism works like a charm. But in case where say for example , the servlet itself threw a exception, I get no response back to indicate a HTTP worker thread is free .So there is a possibility that the task remain in the queue for ever. To overcome this, I was thinking of a timer based approach, where if there no HTTP response for 15 minutes, submit the next-in queue task for execution. Kind of fall back mechanism to avoid major memory leaks.

Upvotes: 1

Views: 1186

Answers (1)

Bohemian
Bohemian

Reputation: 425033

Limit the maximum amount of time allowed for a task by calling get() with a timeout and on catching TimeoutException, perform your clean up.

Here's an implementation that doesn't block the main thread while waiting:

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService monitor = Executors.newFixedThreadPool(99);

public void submit(Runnable task) {
    Runnable monitorTask = new Runnable() {
        @Override
        public void run() {
            Future<?> future = executor.submit(task);
            try {
                future.get(15, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                // retry waiting. iterative approach not shown here
            } catch (ExecutionException e) {
                // your task exploded
            } catch (TimeoutException e) {
                // your task timed out - perform clean up, eg
                future.cancel(true);
            }               
        }
    };
    monitor.submit(monitorTask);
}

Separate thread pools are used to prevent there being no threads available to monitor, but a thread available to execute the task leading to an unmonitored task.

Upvotes: 5

Related Questions