Berkin
Berkin

Reputation: 1664

Usage Of Executor Thread Pool

I am working on executor thread pools to check if any object has been inserted into a blocking queue. If any object has been inside of the queue, one thread wakes up from the pool and take object from the queue, send it to some class to process.

But im confused at using executor threads such as below. When I am using them inside a for loop, processes works fast as I expected but it looks like something wrong. When I take executors out inside of the for loop, processes gets slow. Is this logic correct?

Rest Class

@RestController
public class FraudRestController {

    @Autowired
    private CoreApplication core;

//LOGIC HERE
....

core.addMesageToQueue(rbtran, type);

}

Message Add To Queue

public static void addMessageToQueue(TCPRequestMessage message) throws InterruptedException {
        jobQueue.put(message);
    }

Executor Threads To Listen Queue in Core Class

ExecutorService consumers = Executors.newFixedThreadPool(THREAD_SIZE);
//Core Inits in here
    @PostConstruct
    public void init() {
        //LOGIC
        ...
        //<---THIS BLOCK----->
        for (int i = 0; i < THREAD_SIZE; i++) { //<---- This For Loop
            consumers.submit(() -> {
                while (true)
                    sendMessageToServer();
            });
        }
        //<---THIS BLOCK----->
    }

Send Message Function

private void sendMessageToServer() throws Exception {
//LOGIC
...
    if (host.isActive()) {
        TCPRequestMessage message = jobQueue.take();
}

Upvotes: 2

Views: 154

Answers (1)

janardhan sharma
janardhan sharma

Reputation: 335

This will create a thread pool for you of the size that you pass.

ExecutorService consumers = Executors.newFixedThreadPool(THREAD_SIZE);

This means now there are THREAD_SIZE number of threads waiting on a queue. This queue created is a LinkedBlockingQueue. This queue has the property of making the threads wait on it if it is empty or full.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

If a task is submitted to a pool, at that time if the queue is full, the task wont be submitted. In our case since we did not mention the size, so the size here is Integer.MAX_VALUE

If the queue is empty, the threads in the pool will be waiting for the task to be inserted in the queue. When the ExecutorService's submit method is called. Internally, the task is submitted into the queue boolean offer(E e); of the LinkedBlockingQueue.

I believe based on this, you can may be re design what you are implementing.

Upvotes: 1

Related Questions