user68883
user68883

Reputation: 838

Multi threaded program using newFixedThreadPool doesn't run as excepted when the thread pool size is less than the number of tasks executed

    package com.playground.concurrency;

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;

    public class MyRunnable implements Runnable {
        private String taskName;

        public String getTaskName() {
            return taskName;
        }

        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }

        private int processed = 0;

        public MyRunnable(String name) {
            this.taskName = name;
        }

        private boolean keepRunning = true;

        public boolean isKeepRunning() {
            return keepRunning;
        }

        public void setKeepRunning(boolean keepRunning) {
            this.keepRunning = keepRunning;
        }

        private BlockingQueue<Integer> elements = new LinkedBlockingQueue<Integer>(10);

        public BlockingQueue<Integer> getElements() {
            return elements;
        }

        public void setElements(BlockingQueue<Integer> elements) {
            this.elements = elements;
        }

        @Override
        public void run() {
            while (keepRunning || !elements.isEmpty()) {
                try {
                    Integer element = elements.take();
                    Thread.sleep(10);
                    System.out.println(taskName +" :: "+elements.size());
                    System.out.println("Got :: " + element);
                    processed++;
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
            System.out.println("Exiting thread");

        }

        public int getProcessed() {
            return processed;
        }

        public void setProcessed(int processed) {
            this.processed = processed;
        }

    }

package com.playground.concurrency.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.playground.concurrency.MyRunnable;

public class TestService {
    public static void main(String[] args) throws InterruptedException {
        int roundRobinIndex = 0;
        int noOfProcess = 10;
        List<MyRunnable> processes = new ArrayList<MyRunnable>();
        for (int i = 0; i < noOfProcess; i++) {
            processes.add(new MyRunnable("Task : " + i));
        }
        ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(5);

        for (MyRunnable process : processes) {
            threadPoolExecutor.execute(process);
        }
        int totalMessages = 1000;
        long start = System.currentTimeMillis();
        for (int i = 1; i <= totalMessages; i++) {
            processes.get(roundRobinIndex++).getElements().put(i);
            if (roundRobinIndex == noOfProcess) {
                roundRobinIndex = 0;
            }
        }
        System.out.println("Done putting all the elements");
        for (MyRunnable process : processes) {
            process.setKeepRunning(false);
        }
        threadPoolExecutor.shutdown();
        try {
            threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long totalProcessed = 0;
        for (MyRunnable process : processes) {
            System.out.println("task " + process.getTaskName() + " processd " + process.getProcessed());
            totalProcessed += process.getProcessed();
        }
        long end = System.currentTimeMillis();
        System.out.println("total time" + (end - start));

    }

}

I have a simple task that reads elements from a LinkedBlockingQueue. I create multiple instances of these tasks and execute by ExecutorService . This programs works as expected when the noOfProcess and thread pool size is same.(For ex: noOfProcess=10 and thread pool size=10).

However , if noOfProcess=10 and thread pool size =5 then the main thread keeps waiting at the below line after processing a few items.

processes.get(roundRobinIndex++).getElements().put(i);

What am i doing wrong here ?

Upvotes: 0

Views: 104

Answers (2)

Kaan
Kaan

Reputation: 5754

To fix the behavior, you need to do one of the following:

  • have enough Runnables, each with enough queue capacity to take all 1,000 messages (for example: 100 Runnables with capacity 10 or more; or 10 Runnables with capacity 100 or more), or
  • have a thread pool that is large enough to accomodate all of your Runnables so that each of them can start running.

Without one of those happening, the ExecutorService will not start the extra Runnables. The main worker thread will continue adding items to each queue, including those of non-running Runnables, until it encounters a queue that is full, at which point it blocks. With 10 Runnables and thread pool size 5, the first queue to fill up will the be the 6th Runnable. This is the same if you had just 6 Runnables. The significant point is that you have at least one more Runnable than you have room in your thread pool.

From newFixedThreadPool() Javadoc:

If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

Consider a simpler example of 2 processes and thread pool size of 1. You'll be allowed to create the first process and submit it to the ExecutorService (so the ExecutorService will start and run it). The second process however, will not be allowed to run by the ExecutorService. Your main thread does not pay attention to this, however, and it will continue putting elements into the queue for the second process even though nothing is consuming it.

Your code is ok with noOfProcess=10 and thread pool size=5 – if you also change your queue size to 100, like this: new LinkedBlockingQueue<>(100).

You can observe this behavior – where the queue of a non-running Runnable fills up – if you change this line:

processes.get(roundRobinIndex++).getElements().put(i);

to this (which is the same logical code, but has object references saved for use inside the println() output):

MyRunnable runnable = processes.get(roundRobinIndex++);
BlockingQueue<Integer> elements = runnable.getElements();
System.out.println("attempt to put() for " + runnable.getTaskName() + " with " + elements.size() + " elements");
elements.put(i);

Upvotes: 0

Prouflon
Prouflon

Reputation: 88

Ah yes. The good old deadlock.

What happens is: You submit 10 Tasks to the ExecutorService, and then send jobs via .put(i). This blocks for Task 5 as expected when its queue is full. Now Task 5 is not currently being executed, and as a matter of fact will never be, since Task 0 to 4 are still clogging up your FixedThreadPool, blocking at .take() in the run() Method waiting for new Jobs from .put(i), which they will never get.

This error is a fundamental design flaw within your code and there are myriads of ways to fix it, one of which being the increased Thread Pool Size.

My suggestion is that you go back to the drawing board and rethink the structure in the main Method.

And since you posted your code, have some tips:

1.: Posting your entire code can be interpreted as a call to 'pls fix my code', and you are encouraged to omit all uneccessary details (like all those getters and setters). Maybe check https://stackoverflow.com/help/minimal-reproducible-example

2.: Posting two classes in the same body made things kinda complicated. Split it next time.

3.: (nitpick) processes.get(roundRobinIndex++).getElements().put(i); Combining two operations like you did here is bad style since it makes your code less readable for others. You could just have written: processes.get(i % noOfProcesses).getElements().put(i);

Upvotes: 2

Related Questions