jabawaba
jabawaba

Reputation: 279

Why is BlockingQueue.take() not releasing the thread?

In this simple short program, you will notice that the program hangs forever because the take() does not release the thread. According to my understanding, take() causes the thread to be released even though the task itself is blocked on take().

Edited:

This works (thanks to you all for fixing the autoboxing):

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducersConsumers {
    private static int THREAD_COUNT = 5;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final ExecutorService executorPool = Executors.newFixedThreadPool(THREAD_COUNT);
        final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<Long>();

        Collection<Future<Long>> collection = new ArrayList<Future<Long>>();


        // producer:
        for (int i = 0; i < 20; i++) {
            collection.add(executorPool.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    for (int i = 100; i >= 0; i--) {
                        queue.put((long) i);
                    }
                    return -1L;
                }
            }));
        }

        // consumer:
        for (int i = 0; i < 20; i++) {
            collection.add(executorPool.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    while (true) {
                        Long item = queue.take();
                        if (item.intValue() == 0) {
                            break;
                        }
                    }
                    return 1L;
                }
            }));
        }

        long sum = 0;
        for (Future<Long> item : collection) {
            sum += item.get();
        }

        executorPool.shutdown();
        System.out.println("sum = " + sum);
    }
}

But if you swap the producer and consumer invocations, it will hang:

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducersConsumers {
    private static int THREAD_COUNT = 5;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final ExecutorService executorPool = Executors.newFixedThreadPool(THREAD_COUNT);
        final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<Long>();

        Collection<Future<Long>> collection = new ArrayList<Future<Long>>();


        // consumer:
        for (int i = 0; i < 20; i++) {
            collection.add(executorPool.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    while (true) {
                        Long item = queue.take();
                        if (item.intValue() == 0) {
                            break;
                        }
                    }
                    return 1L;
                }
            }));
        }

        // producer:
        for (int i = 0; i < 20; i++) {
            collection.add(executorPool.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    for (int i = 100; i >= 0; i--) {
                        queue.put((long) i);
                    }
                    return -1L;
                }
            }));
        }

        long sum = 0;
        for (Future<Long> item : collection) {
            sum += item.get();
        }

        executorPool.shutdown();
        System.out.println("sum = " + sum);
    }
}

To my understanding the producer and consumer order should not matter. In other words, there is a notion of task and thread. Thread are independent of code program whereas task is associated with a certain program. Therefore, in my example, when the JVM assigns a thread to execute of the Callable tasks, if the consumer is instantiated first, then the task will block on take(). Once the JVM discovers that the task is blocked, it will release the thread (or as I understand it but it is not releasing it) and places it back to the worker thread pool in preparation for processing a runnable task (which in this case are the Producers). Consequently, at the end of instantiating all the Callable's, there should be 40 tasks but only 5 threads; 20 of those tasks are blocked, 5 of the tasks should be running and 15 should be waiting (to run).

Upvotes: 1

Views: 10062

Answers (4)

ChrisWue
ChrisWue

Reputation: 19020

I think you misunderstand how threads and threadpools work. A threadpool typically has a work item queue which contains items to be worked on (in your case Callable<>s).

It also contains a (maximum) number of threads (in your case 5) which can work on those items.

The lifetime of an active thread is defined by the code it executes - usually a method. The thread becomes "alive" when it starts executing the method and it ends when it returns. If the method blocks to wait on some signal it does not mean the the thread can go away and execute some other method - that's not how threads work. Instead the thread will be blocked until it can continue execution and enable other threads to be run.

The method which is run by a threadpool thread usually looks like this:

void threadloop()
{
    while (!quit)
    {
        Callable<T> item = null;
        synchronized (workQueue)
        {
            if (workQueue.Count == 0)
                workQueue.wait();

            // we could have been woken up for some other reason so check again
            if (workQueue.Count > 0)
                item = workQueue.pop();
        }
        if (item != null)
             item.Call();
    }
}

This is more or less pseudo code (I'm not a Java developer) but it should show the concept. Now item.Call() executes the method which is supplied by the user of the pool. If that method blocks, then what happens? Well - the thread will be blocked in its execution of item.Call() until the method wakes up again. It can't just go away and execute some other code arbitrarily.

Upvotes: 2

matt b
matt b

Reputation: 139971

I think you've misunderstood what gets "blocked" in a BlockingQueue.

The call to queue.take() blocks the thread that invoked it until something is available in the queue. This means that the thread will wait there endlessly, unless interrupted, until an item is added to the queue.

The second code sample hangs the problem because you are adding 20 tasks to wait for an item to appear in the BlockingQueue, and the executor has just 5 threads in it - thus the first five tasks cause all five of the threads to block. This executor is filled with 15 further consumer tasks.

The addition of tasks in the second for-loop to add items to the queue results in 20 tasks that can never be executed, because all threads in the executor are stuck waiting.

So when you say this:

According to my understanding, take() causes the thread to be released even though the task itself is blocked on take().

You have a misunderstanding because there is no difference here between what the "thread" does and what the "task" does. A thread cannot be "released" while the task is blocked - it is the thread that runs the task. When the thread encounters a blocking call to take(), the thread is blocked, period.

Upvotes: 1

Femi
Femi

Reputation: 64700

From javadoc:

Retrieves and removes the head of this queue, waiting if no elements are present on this queue.

It will wait: you're running in main, so it will stay there.

EDIT: correction: the blocking still happens (in the thread pool threads, not in main). There is no yielding going on: the 20 threads are blocked on the take calls, so no put calls execute, so the Futures never complete, so the program hangs.

Upvotes: 2

Alex Gitelman
Alex Gitelman

Reputation: 24732

I don't know what exactly you mean by release thread but once you block on take() the calling thread is blocked and is not going back to the pool.

Upvotes: 1

Related Questions