SyncMaster
SyncMaster

Reputation: 9936

How to use fixed number of threads to run certain number tasks in Java

I have a specific number of threads THREAD_POOL_SIZE and a bunch of tasks which can exceed the number of threads. I want to use these k threads to run all my tasks. I have these tasks in a BlockingQueue and each thread returns a result which should be aggregated later.

Here I wrote a simple program where tasks are numbers from 1 to 100 and I am trying to calculate the sum of all number. Each thread will pick a number from the blocking queue and return it. I am using Future to collect my result and sum it later.

The reason for using a BlockingQueue is because I am trying to solve a bigger problem where I can have tasks in a blocking queue and I have certain number of threads to run those tasks.

I would like to know how I can fix the below code to make the k threads continue processing entries from the blocking queue?

static class Consumer implements Callable<Integer> {
    private BlockingQueue<Integer> sharedQueue;

    public Consumer(BlockingQueue<Integer> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public Integer call() {
        while(true){
            try {
                return sharedQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public static void main(String[] args) throws Exception {
    int THREAD_POOL_SIZE = 10;
    int BLOCKING_QUEUE_SIZE = 100;
    BlockingQueue<Integer> sharedQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_SIZE);
    ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    List<Future<Integer>> futures = new ArrayList<>();
    for (int i = 0; i < BLOCKING_QUEUE_SIZE; i++) {
        sharedQueue.add(i);
    }

    for (int i = 0; i < THREAD_POOL_SIZE; i++) {
        futures.add(execService.submit(new Consumer(sharedQueue)));
    }

    int total = 0;
    for (Future<Integer> future : futures) {
        try {
            total += future.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    System.out.println("Total count: " + total);
    execService.shutdown();
}

Thanks for your help!

Upvotes: 0

Views: 216

Answers (2)

Michael Gantman
Michael Gantman

Reputation: 7792

Even a glance at your code shows that your code is a way of an overkill. There are java classes that handle this stuff for you. Please look at ExecutorService class and Executors class. ExecutorService and its specific implementations provide a managed thread pooling. All you need to do is to invoke method newFixedThreadPool of class Executors and then submit all your tasks to that pool. You don't need any queue, your tasks will be managed by the thread pool. You can collect your Futuresand monitor to see when your tasks are finished. It is mor simple then you thought

Upvotes: 1

Michael
Michael

Reputation: 44150

You need to add 100 futures to the executor, not 10:

for (int i = 0; i < THREAD_POOL_SIZE; i++) {

Should be:

for (int i = 0; i < 100; i++) {

It's curious what you think the queue is really adding here. You can massively simplify your code without it.

int THREAD_POOL_SIZE = 10;
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List<Future<Integer>> futures = new ArrayList<>();

for (int i = 0; i < 100; i++) {
    final int j = i;
    futures.add(execService.submit(() -> j));
}

int total = 0;
for (Future<Integer> future : futures) {
    try {
        total += future.get();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
System.out.println("Total count: " + total);
execService.shutdown();

Upvotes: 1

Related Questions