qmalt
qmalt

Reputation: 23

JAVA Thread pool reusing threads

everyone. I have a misunderstanding of working with Thread Pools. The real result differs from API description of this class. When I am using LinkedBlockedQueue in Thread pool with it does not reuse threads, thread pool wait KeepAliveTime that was set in the constructor, then kill this thread and create a new one. When I set KeepAliveTime small, like 1 second or less it deletes thread a recreate it, but if I set for a minute new threads aren't created because MaxPoolSize doesn't allow it and queue already full so all tasks rejected, but threads for which keepAliveTime set minute doing nothing this time. I am quite new and don't understand why it doesn't reuse these threads. after keepTimeAlive expiration it kills these thread and if queue full, it creates a new one. Why it works this way? As far as I understood from API it has to reuse it if threads are idle during keepAliveTime. It reuses threads when I used SynchronousQueue, but not LinkedBlockingQueue.

public class Main {

    private volatile int remainingTasksCount;
    private volatile static ThreadPoolExecutor consumer = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3));

    private static Runnable task = () -> {
        System.out.println(String.format("consumer %s, id %s, size %s, active count %s, queue %s",
                Thread.currentThread().getName(), Thread.currentThread().getId(),
                consumer.getPoolSize(), consumer.getActiveCount(), 3-consumer.getQueue().remainingCapacity()));
        String s = new String();
        synchronized (s) {
            try {
                s.wait(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };


    public static void main(String[] args) throws IOException {
        try {
            new Thread(() -> {
                while (true) {
                    try {
                        for (int i = 0; i < 5; i++) {
                            consumer.submit(task);
                        }
                        System.out.println("PUSH TASKS");
                        synchronized (Thread.currentThread()) {
                            Thread.currentThread().wait(10000);
                        }
                    } catch (Throwable th) {
                        System.out.println(th);
                    }
                }
            }).start();
        } catch (Throwable th) {
            System.out.println(th);
        }
    }  

OUTPUT

PUSH TASKS
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 0
Disconnected from the target VM, address: '127.0.0.1:64434', transport: 'socket'

Process finished with exit code 1

But next time producer submit tasks, I get RejectedExecutionException

if I change keepAliveTime to 1 Second. Everything is working well, but creates new Threads.

PUSH TASKS
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
consumer pool-1-thread-3, id 17, size 1, active count 1, queue 2
PUSH TASKS
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 2
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0

I will be glad if someone could explain me my fault, or something basic principle that I missed

Upvotes: 1

Views: 2902

Answers (2)

Mikita Harbacheuski
Mikita Harbacheuski

Reputation: 2253

I think you have some misunderstanding how thread pool works because of your code sample. I tried to run it and get output from 5 tasks and infinite number of RejectedExecutionException after. This happens because in case of exception Thread.currentThread().wait(10000); isn't invoked and 5 more tasks are added to the pool and this logic repeats again and again producing new exceptions. Try to surround consumer.submit(task); with try-catch block and you will see that only two threads process all tasks as expected because keepTimeAlive is longer than wait time. In the second sample keepTimeAlive is shorter than the wait time so after each wait new non-core thread is created and you see different ids after each loop invocation. Which is correct because previous non-core thread was stopped as it was idle longer than keepTimeAlive.

Upvotes: 0

tevemadar
tevemadar

Reputation: 13205

It is a race condition. If you follow submit() long enough (in the source code), you will arrive to ThreadPoolExecutor.execucte():

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /* long comment block removed */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

When your submit-loop runs for the first time, execute will create new workers, and provide them your tasks without trying to push them into the queue (addWorker+return), so 2 tasks start immediately, and 3 go into the queue which can accommodate all 3 of them.

On the second time, submit-s will end up with the workQueue.offer thing, which may saturate the queue (depending on how fast the workers get to attempt consuming the new item), and when it does, the last-effort addWorker will run and fail, resulting in reject, as no new workers are allowed to be created.

Practically if you start doing 'things' in your submit-loop, it will eventually start working. For example I tried to println(i), and that was slow enough to get some tasks consumed and the loop success. When I tried print(i) that was already too fast, it died on the 4th submit, so no tasks were consumed soon enough. So it is a delicate matter, which race conditions usually are.

Upvotes: 3

Related Questions