Denis Kulagin
Denis Kulagin

Reputation: 8947

Safe thread utilization

I am using single thread executor for long-running threads like this:

executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
executor.submit(new LongRunnable());

which checks a flag to be stopped:

private class LongRunnable implements Runnable {
        @Override
        public void run() {
            while (isRunning.get()) {
                try {
                    doSomething();
                } catch (InterruptedException e) {
                    ...
                }
            }

        }
    }

and whole execution is interrupted that way:

@Override
public void close() throws Exception {
    isRunning.set(false);
    executor.shutdownNow();
}

Still I can see some threads not gc-ed in profiler (while by logs, runnable they were executing has quit outermost while loop).

Question: does provided working with threads strategy memory-leak-free and thread-leak-free?

Upvotes: 2

Views: 134

Answers (2)

Nathan Hughes
Nathan Hughes

Reputation: 96454

Here's a sample program using the single-thread Executor that manages to strand a thread so that the JVM can't shut down, but it only manages to do it by not calling shutdownNow:

import java.util.concurrent.*;

public class Exec {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(new MyTask());
        Thread.sleep(20000L);
//        executor.shutdownNow();
        int retryCount = 4;
        while (!executor.isTerminated() && retryCount > 0) {
            System.out.println("waiting for tasks to terminate");
            Thread.sleep(500L);
            retryCount -= 1;
        }       
    }
}

class MyTask implements Runnable {
    public void run() {
        int count = 0;
        try {
            while (!Thread.currentThread().isInterrupted() && count < 10) {
                Thread.sleep(1000L);
                count += 1;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("all done");
    }
}

The thread used by the executor has a separate life cycle from the task, this example shows how the task finishes but the thread goes on. Uncommenting the shutdownNow results in the executor's thread terminating. Otherwise the main thread sleeps for a while and exits, leaving the executor's thread hanging out, preventing the JVM from exiting.

My guess is that your close method isn't getting called and your executor never gets shut down. To get more useful answers please add a MVCE so that we can reproduce the problem.

Consider that with interruption there's no need to keep a reference to the Runnable to set the flag. As I read the question the task not finishing is not an issue here, but it would still be better to make the Runnable respond to interruption and lose the flag, just because having less things to keep track of is always an improvement.

Upvotes: 1

hemant1900
hemant1900

Reputation: 1226

I am not able to see any issue with executor or shutDownNow. Probably you are looking at different threads in your profiler.

Try this program which is similar to the one in your question and you can see the thread is no longer there after successful shutdown.

public class ExecutorShutdownTest {

private static ExecutorService executor;
private static AtomicLong executorThreadId = new AtomicLong(0);

public static void main(String[] args) {
    // get thread MX bean
    ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
    // create an executor and start the task
    executor = Executors.newSingleThreadExecutor(new TestThreadFactory());
    LongRunnable runnable = new LongRunnable();
    executor.submit(runnable);
    // main thread: keep running for sometime
    int count = 5;
    while (count-- > 0) {
        try {
            Thread.sleep(1000);
            System.out.println(String.valueOf(threadMXBean.getThreadInfo(executorThreadId.longValue())).replace("\r", "").replace(
                    "\n", ""));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // main thread: stop the task
    try {
        runnable.close();
        System.out.println(String.valueOf(threadMXBean.getThreadInfo(executorThreadId.longValue())).replace("\r", "").replace("\n", ""));
    } catch (Exception e) {
        e.printStackTrace();
    }
    // main thread: run some more time to verify the executor thread no longer exists
    count = 5;
    while (count-- > 0) {
        try {
            Thread.sleep(1000);
            System.out.println(String.valueOf(threadMXBean.getThreadInfo(executorThreadId.longValue())).replace("\r", "").replace("\n", ""));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

private static class LongRunnable implements Runnable {

    private volatile boolean isRunning = true;

    @Override
    public void run() {
        while (isRunning) {
            System.out.println("Running");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //ignore
            }
        }
        System.out.println("Stopped");
    }

    public void close() throws Exception {
        System.out.println("Stopping");
        isRunning = false;
        executor.shutdownNow();
    }
}

private static class TestThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    TestThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0) {
            @Override protected void finalize() throws Throwable {
                super.finalize();
                // probably bad idea but lets see if it gets here
                System.out.println("Executor thread removed from JVM");
            }
        };
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        executorThreadId.set(t.getId());
        System.out.println("Executor thread created");
        return t;
    }
}

}

Upvotes: 1

Related Questions