Pmarcoen
Pmarcoen

Reputation: 1236

ThreadPoolExecutor - Changing the pool size even if no threads become idle

I need some help to better understand how the ThreadPoolExecutor works.

I have 1000's of tasks to be handled, this number is known at the start of my program.

I set up a ThreadPoolExecutor and would like to change the number of threads it uses on the fly so that if the server is not that active (e.g. at night), we can increase the number of threads that it can use.

Increasing the number of threads works fine, the problem I have is that when I try to decrease the number of threads by using setCorePoolSize.

From what I understand, this value only gets changed when the thread becomes idle. Since there are constantly tasks to process, this thread never becomes idle and so it is never shutdown.

What should I be using then to decrease the number of threads?

This is my demo code:

package com.test.executortest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 *
 * @author peter.marcoen
 */
public class ExecutorTest {

    final static Logger log = LogManager.getLogger(ExecutorTest.class);

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor x = new ThreadPoolExecutor(1, 100, 30, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>());
        for (int i = 1; i <= 10; i++) {
            RunnableTest r = new RunnableTest();
            r.num = String.valueOf(i);
            x.execute(r);
        }

        x.shutdown();

        int i = 0;
        while (!x.isTerminated()) {
            i++;
            log.debug("Active count: {}, Core pool size: {}, Maximum pool size: {}, Pool size: {}", x.getActiveCount(), x.getCorePoolSize(), x.getMaximumPoolSize(), x.getPoolSize());
            if (i == 2) {
                log.info("!!!! Setting core pool size to 2 !!!!");
                x.setCorePoolSize(2);    
            } else if (i == 10) {
                log.info("!!!! Setting core pool size to 1 !!!!");
                x.setCorePoolSize(1);
            }
            Thread.sleep(1000);
        }
    }
}

With RunnableTest just sleeping for 5 seconds:

package com.test.executortest;

import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.logging.log4j.LogManager;

/**
 *
 * @author peter.marcoen
 */
public class RunnableTest implements Runnable {
    public String num;
    final static org.apache.logging.log4j.Logger log = LogManager.getLogger(RunnableTest.class);

    @Override
    public void run() {
        log.debug("Started parsing #{}", num);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ex) {
            Logger.getLogger(RunnableTest.class.getName()).log(Level.SEVERE, null, ex);
        }
        log.debug("Finished parsing #{}", num);
    }

}

And this is the output. As you can see, even after setting the coreThreads back to 1, there are always 2 tasks being processed:

11:43:45.696 - Active count: 1, Core pool size: 1, Maximum pool size: 100, Pool size: 1
11:43:45.696 - Started parsing #1
11:43:46.710 - Active count: 1, Core pool size: 1, Maximum pool size: 100, Pool size: 1
11:43:46.710 - !!!! Setting core pool size to 2 !!!!
11:43:46.710 - Started parsing #2
11:43:47.724 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:48.738 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:49.738 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:50.705 - Finished parsing #1
11:43:50.705 - Started parsing #3
11:43:50.752 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:51.719 - Finished parsing #2
11:43:51.719 - Started parsing #4
11:43:51.766 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:52.780 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:53.794 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:54.795 - Active count: 2, Core pool size: 2, Maximum pool size: 100, Pool size: 2
11:43:54.795 - !!!! Setting core pool size to 1 !!!!
11:43:55.715 - Finished parsing #3
11:43:55.715 - Started parsing #5
11:43:55.809 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:43:56.729 - Finished parsing #4
11:43:56.729 - Started parsing #6
11:43:56.822 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:43:57.836 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:43:58.851 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:43:59.865 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:00.723 - Finished parsing #5
11:44:00.723 - Started parsing #7
11:44:00.879 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:01.739 - Finished parsing #6
11:44:01.739 - Started parsing #8
11:44:01.880 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:02.894 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:03.908 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:04.922 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:05.733 - Finished parsing #7
11:44:05.733 - Started parsing #9
11:44:05.936 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:06.749 - Finished parsing #8
11:44:06.749 - Started parsing #10
11:44:06.936 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:07.937 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:08.937 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:09.951 - Active count: 2, Core pool size: 1, Maximum pool size: 100, Pool size: 2
11:44:10.747 - Finished parsing #9
11:44:10.965 - Active count: 1, Core pool size: 1, Maximum pool size: 100, Pool size: 1
11:44:11.762 - Finished parsing #10

Upvotes: 2

Views: 1527

Answers (1)

Zim-Zam O&#39;Pootertoot
Zim-Zam O&#39;Pootertoot

Reputation: 18148

Rather than dynamically changing the size of a core ThreadPoolExecutor (which as you've discovered can be a bit flaky), in the past I've created an auxiliary ThreadPoolExecutor to steal tasks from the core executor - this allowed me to terminate the auxiliary executor without impacting the core executor. Ideally we'd just be able to have the two executors share one work queue, but this appeared to introduce weird concurrency bugs and was more trouble than it was worth - better to keep the queues separate and to use afterExecute to keep the auxiliary executor's pool filled.

public class AuxiliaryExecutor extends ThreadPoolExecutor {
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<Runnable> coreQueue;

    private void pollAndExecute() {
        try {
            this.execute(coreQueue.poll());
        } catch(NullPointerException e) {
            this.execute(new Runnable() {
                public void run() {
                    try {
                        this.execute(coreQueue.take());
                    } catch(InterruptedException e) {
                        return;
                    }
                }
            });
        }
    }

    public AuxiliaryExecutor(ThreadPoolExecutor coreExecutor, int poolSize) {
        super(poolSize, poolSize, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(poolSize));
        this.coreQueue = coreExecutor.getQueue();
        for(int i = 0; i < poolSize; i++) {
            pollAndExecute();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        if(!shutdown.get()) {
            pollAndExecute();
        }
    }

    @Override
    public void shutdown() {
        shutdown.set(true);
    }
}

If you want to change the size of the auxiliary executor then you can instead shut down the auxiliary executor and replace it with a new resized executor.

Upvotes: 1

Related Questions