utsav madaan
utsav madaan

Reputation: 21

Java ExecutorService - Task/Callable not cancelling/interrupting

I am using Java ExecutorService (ThreadPool) to perform a task & update UI while a particular activity is in foreground (visible).

Problem: What i want is when user switches to another activity i want to stop/cancel all the task(whether queued or running). And for this i have to either use ExecutorService shutdown/shutdownNow method or cancel(true) on Future object returned by ExecutorService submit method after checking of Future object status by calling isDone(). This would set the corresponding thread flag for interruption to TRUE which i have to check (Thread.currentThread.isInterrupted()) in my callable implementation to determine if is interrupted exit the task/thread. Problem is whether i call ExecutorService shutdown method or Future cancel(true) method in both the cases rarely 1 of 10 times it sets the thread interruption flag to TRUE which is eventually leading to memory leaks, etc.

Code:

ThreadPool Singleton implementation(cancelAll-to cancel tasks & shutdownExecutor-to shutdown ExecutorService):

private static class ThreadPoolManager {

    private ExecutorService executorService;
    private List<Future> queuedFutures;
    private BlockingQueue<Runnable> blockingQueue;

    private static ThreadPoolManager instance;

    private ThreadPoolManager() {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
        queuedFutures = new ArrayList<>();
        blockingQueue = new LinkedBlockingDeque<>();
        executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
    }

    static {
        instance = new ThreadPoolManager();
    }

    public static void submitItemTest(Callable<Object> callable) {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
        if(instance.executorService.isShutdown()){
            instance=new ThreadPoolManager();
        }
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    }

    public static void submitTestAll(Callable<Object> callable) {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
        if(instance.executorService.isShutdown()){
            instance=new ThreadPoolManager();
        }
        cancelAll();
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    }

    public static void cancelAll() {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
        instance.blockingQueue.clear();
        for (Future future : instance.queuedFutures) {
            if (!future.isDone()) {
                boolean cancelled = future.cancel(true);
                MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
            }
        }
        instance.queuedFutures.clear();
    }

    public static void shutdownExecutor(){
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
        instance.executorService.shutdownNow();
    }
}

Callable Implementation(normal iteration & if clause to check for interruption):

private Callable<Object> getTestAllCallable() {
        return new Callable<Object>() {
            @Override
            public Object call() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if (!Thread.currentThread().isInterrupted()) {
                          //someWork

                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                        return null;
                    }
                }
                return null;
            }
        };
    }

Activity/Fragment onStop implementation(for invoking cancel task & shutdown):

@Override
public void onStop() {
    MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
    ThreadPoolManager.cancelAll();
    ThreadPoolManager.shutdownExecutor();
    super.onStop();
}

Update:

Changes made:

  1. Moved from using Runnable instead of callable.

  2. Now not using singleton for ExecutorService.

      private class ThreadPoolManager {
    
        private ExecutorService executorService;
        private List<Future> queuedFutures;
        private BlockingQueue<Runnable> blockingQueue;
    
        private ThreadPoolManager() {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
            queuedFutures = new ArrayList<>();
            blockingQueue = new LinkedBlockingDeque<>();
            executorService =getNewExecutorService();
        }
    
        private ExecutorService getNewExecutorService(){
            return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
        }
    
        private void submitItemTest(Runnable runnable) {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
            if(executorService.isShutdown()){
                executorService=getNewExecutorService();
            }
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        }
    
        private void submitTestAll(Runnable runnable) {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
            if(executorService.isShutdown()){
                executorService=getNewExecutorService();
            }
            cancelAll();
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        }
    
        private void cancelAll() {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
            blockingQueue.clear();
            for (Future future : queuedFutures) {
                if (!future.isDone()) {
                    boolean cancelled = future.cancel(true);
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
                }
            }
            queuedFutures.clear();
        }
    
        private void shutdownExecutor(){
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
            executorService.shutdownNow();
            blockingQueue.clear();
            queuedFutures.clear();
        }
    }
    

Found out the culprit but not the solution yet. Following 2 are the implementation of Runnables 1 of which is working (isInterrupted returns true or comes InterupptedException and than task ended) but not other.

Working Runnable(I used it for testing):

new Runnable() {
          @Override
          public void run() {
                    int i=0;
                    while(!Thread.currentThread().isInterrupted()){
                        try {
                            System.out.println(i);
                            Thread.currentThread().sleep(2000);
                        } catch (InterruptedException e) {
                            MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
                            return;
                        }
                        i++;
                    }
                }
            }

Not working(actual code i want to use):

new Runnable(){
            @Override
            public void run() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if (!Thread.currentThread().isInterrupted()) {

                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
                        break;
                    }
                }
            }
        };

And 1 probable solution would be to use variable(boolean) as a interruption flag inside the runnable that i will consider as a last resort but would happy to get to know the mistake.

Upvotes: 0

Views: 2096

Answers (2)

utsav madaan
utsav madaan

Reputation: 21

Solution(Way out): So finally i went on to use custom internal flag(boolean)as a Thread interruption flag which will be checked on every iteration by MyRunnable(custom implementation of runnable with custom flag so to have a flag associated with each runnable). Than when cancellation of threads under ExecutorService(ThreadPool) is required i iterate over all Future objects and gets it associated MyRunnable and than set its Interruption Flag(Custom flag) to true which than interrupts/close thread.

ThreadPoolManager:

private class ThreadPoolManager {

        private ExecutorService executorService;
        private final Map<Future,MyRunnable> queuedFutures;
        private final BlockingQueue<Runnable> blockingQueue;

        private ThreadPoolManager() {
            MyLogger.log(DEBUG, "Threadpool-created(constructor)");
            queuedFutures = new HashMap<>();
            blockingQueue = new LinkedBlockingDeque<>();
            executorService = getNewExecutorService();
        }

        private ExecutorService getNewExecutorService() {
            return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
        }

        private void submitItemTest(MyRunnable runnable) {
            MyLogger.log(DEBUG, "Threadpool-submitted item test");
            if (executorService.isShutdown()) {
                executorService = getNewExecutorService();
            }
            Future future = executorService.submit(runnable);
            queuedFutures.put(future,runnable);
        }

        private void submitTestAll(MyRunnable runnable) {
            MyLogger.log(DEBUG, "Threadpool-submitted test all");
            if (executorService.isShutdown()) {
                executorService = getNewExecutorService();
            }
            cancelAll();
            Future future = executorService.submit(runnable);
            queuedFutures.put(future,runnable);
        }

        private void cancelAll() {
            MyLogger.log(DEBUG, "ThreadPool: Cancelling all future tasks");
            blockingQueue.clear();
            for (Future future : queuedFutures.keySet()) {
                if (!future.isDone()) {
                    queuedFutures.get(future).continueRunning=false;
                    MyLogger.log(DEBUG, "Cancelled");
                }
            }
            queuedFutures.clear();
        }

        private void shutdownExecutor() {
            cancelAll();
            MyLogger.log(DEBUG, "ThreadPool: Shuttingdown threadpool");
            executorService.shutdown();
        }
    }

MyRunnable(abstract class that implements Runable):

private abstract class MyRunnable implements Runnable {
        boolean continueRunning=true;
    }

MyRunnable(Instance of abstract class MyRunnable):

new MyRunnable() {
       @Override
       public void run() {
           for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                 if (continueRunning) {
                        //someWork
                 } else {
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadPool: Pool Thread Interrupted (closing down)");
                     break;
                 }
            }
            System.out.println("ThreadPool: Test complete");
         }
     };

Now, calling threadPoolManager.shutdownExecutor() shutdown/interrupts all the threads currently running.

Upvotes: 0

zuckermanori
zuckermanori

Reputation: 1755

According to ExecutorService documentation, shutting down an executing task is done on a best effort basis.

Therefore, when you call ExecutorService.shutdownNow() the implementation will try to shut down all currently executing tasks. each task will keep running until it reaches a point where it detects it was interrupted.

In order to assure your threads are reaching that point at early stage it is a good idea to add a check in your loop whether the thread is interuppted as follows:

Thread.currentThread().isInterrupted();

By making this call on every iteration your threads will detects the interruption with a short interval from the actual interupption.

And so your modified Callable code will look as follows:

private Callable<Object> getTestAllCallable() {
    return new Callable<Object>() {
        @Override
        public Object call() {
            for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                if(Thread.currentThread().isInterrupted()) {
                    return null;
                }
                if(someCondition) {
                    //someWork
                } else {
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                    return null;
                }
            }
            return null;
        }
    };
}

By the way, there is no point in using a Callable if you are not intending to return any value from the call() method. if you need the parameterized type in your task just create a parameterized Runnable as follows:

public class ParameterizedRunnable<T> implements Runnable {
    private final T t;

    public ParameterizedRunnable(T t) {
        this.t = t;
    }

    public void run() {
        //do some work here
    }
}

Upvotes: 1

Related Questions