Bas Opers
Bas Opers

Reputation: 287

ExecutorService: how to prevent thread starvation when synchronization barriers are done in the threads

I have a situation for which I am having troubles to find a clean solution. I'll try to explain as detailed as possible.

I have a tree-like structure:

NODE A
    NODE A.1
    NODE A.2
        NODE A.2.a
        NODE A.2.b
    NODE A.3
        NODE A.3.a
        NODE A.3.b
        NODE A.3.c
NODE B
    NODE B.1
    NODE B.2

I need to process the root node:

 public void process(final Node node) { ... }

The process of a node involves two things:

- some database queries
- the process of all children of these nodes

In other words, once NODE.2.a and NODE.2.b are processed, NODE.2 can be processed. I am processing the nodes in a recursive fashion, nothing spectacular.

So far, so good. Now I want to declare a global executor service, with a fixed number of threads. I want to process the child nodes of a node in parallel. So, NODE.2.a and NODE.2.b can be processed each in their own threads. The code would look something like this:

// global executor service, shared between all process(Node) calls
final ExecutorService service = Executors.newFixedThreadPool(4);

public void process(final Node node) {
    // database queries
    ...

    // wait for children to be processed
    final CountDownLatch latch = new CountDownLatch(node.children().size());

    for (final Node child : node.children()) {
        service.execute(() -> {
             process(child);
             latch.countDown();
        });
    }
    latch.await();
}

The problem here is: when a certain depth is reached, all threads are halted in the latch.await(). We have reached a situation of thread starvation.

This can be easily solved by making the executor service unbounded, but I don't like that option. I want to control the number of active threads. In my case, the number of active threads will be equal to the number of cores. Having more active threads will introduce swaps from one thread to another, and I would like to avoid that.

How can this be solved?

Upvotes: 12

Views: 3341

Answers (5)

Persixty
Persixty

Reputation: 8589

It is technically a deadlock. We first think of deadlock as when two or more processes are awaiting on locks but in this case the threads are waiting on each other. Interestingly it's a way of producing the strange beast of a self deadlock. If you have a pool of one thread it will submit tasks and wait on them but never execute because it's waiting on itself to finish to execute them!

The standard answer is called a 'Work Stealing Thread Pool'. I came across exactly the same issue and without that piece of vocabulary took ages to find any information on what I quickly worked out was a common problem in any recursive algorithm that is executed in fully concurrent recursion.

OK, this is a sketch of how it works. Create a fairly standard thread-pool with one trick. When a thread reaches a point where it cannot proceed without the results of a queued item, check if that item has been started by another thread and if it hasn't then execute it in the current thread (rather than waiting), otherwise wait on the thread executing that item to finish it.

It's very simple but may require you to build your own thread-pool class because off-the-shelf solutions often don't support it.

Do beware of course that a typical non-trivial recursive algorithm will breakdown into many more sub-tasks than you have parallel processing units. So it may make sense to enqueue subtasks to some level and then just execute the remainder in a single thread.

That is unless enqueuing and dis-enqueuing items is cheap you can spend time putting items in a queue to take them back. Those operations may be serialised amongst the threads and reduce parallelism. It's hard to use lock-free queues because they don't usually permit extraction from the middle of the queue (as required here). You should limit how deep you go parallel.

On the plus side, notice that executing a task in the currently executing thread involves less task swapping overhead than parking the current thread and (at the OS level) swapping in another worker.

Here is a Java resource that provides a WorkStealingThreadPool. Please note I have made no effort to evaluate the implementation and offer no recommendation. I've been working in C++ in this case or would happily share my template.

https://www.jrebel.com/blog/using-java-executors

Also refer to what Wikipedia has to say: https://en.wikipedia.org/wiki/Work_stealing

Upvotes: 3

maricn
maricn

Reputation: 633

newFixedThreadPool() uses LinkedBlockingQueue when the pool is full.

That queue is unbounded by default, which means any tasks received after filling up the capacity of the thread pool will be queued for execution for when pool has more available slots.

And CountDownLatch#await() will not do busy-wait, it should put the awaiting thread out of the CPU.

So, your problem is basically that you'll exhaust all available threads, and are not reusing them, so no new threads can be created.

All you need here to solve the issue is to slightly modify the default behavior of the newFixedThreadPool(). You can create a thread pool with a queue of limited size, and adapt the behavior of that thread pool in case the queue gets full to reuse the running queue for executing the new task.

ExecutorService executorService = new ThreadPoolExecutor(4,
                     4,
                     0L,
                     TimeUnit.MILLISECONDS,
                     new ArrayBlockingQueue<>(4),
                     new ThreadPoolExecutor.CallerRunsPolicy());

See CallerRunsPolicy, ArrayBlockingQueue.

Upvotes: 0

edharned
edharned

Reputation: 1904

It seem like you want an alternative to recursion. That alternative is scatter-gather. Submit a request that forks all Tasks. When NODE.2 executes, it simple returns since the child nodes have not completed. When the last Task completes (2.1, 2.b), completion processing begins to process NODE.2.

I maintain a Data Parallel open-source product that can do exactly that. Rather than fetching the entire product, simply download the documentation (http://coopsoft.com/JavaDoc.html ) and look at file: Manual/DynamicJoin.html. If that solves your problem, then you fetch the product.

Upvotes: 1

Anton
Anton

Reputation: 6061

I would not use error-prone multiple CountDownLatch-es. As I understand you want to traverse tree from leafs nodes to parent nodes in parallel.

I would traverse tree using simple BFS/DFS (you don't need recursive algorithm there), once leaf is found without children nodes, put this node it into blocking the queue.

The queue can be polled by second thread, this thread will schedule new task to the executor service with fixed number of threads.

Once executor-thread finishes processing current node taken from the queue, executor-thread checks whether current node has parent nodes. If there is a parent node, put it in a queue again. Also you should check whether parent has been already processed.

Something like this:

    BlockingQueue blockingQueue = new BlockingQueue();
    ExecutorService service = 

    new Thread(new Runnable(){
        while (true) {
            Task task = blockingQueue.poll();
            service.execute(new Runnable(){
                if (task.isProcessed())  {
                   return ; 
                }
                ... do you job
                task.setProcessed(true);
                Node node = task.getNode();


                boolean allChildrenProcessed = true;
                for (Node childeNode: node.getChildren()) {
                    allChildrenProcessed &= childeNode.isProcessed();
                }

                if (node.hasParent() && allChildrenProcessed) {
                    blockingQueue.put(node.getParent())
                }

            });
        }
    }).start();

    Stack stack = new StackImpl();

    stack.put(root);
    while (node = stack.pop() != null) {
        for (Node child: node.getChildren()) {
            stack.push(child);
        }
        if (node.getChildren().isEmpty()) {
            // add leaf node for processing
            blockinQueue.add(node);
        }
    }

Upvotes: 1

senseiwu
senseiwu

Reputation: 5279

What you really want could be achieved using

int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(cores);

Remember also that javadoc of getActiveCount() says that it is an approximate number.

Upvotes: 0

Related Questions