Reputation: 14751
I need to process a large number (>100 million) of requests on a multi-core machine (each request is to process a row in a data file and involves some I/O with a remote system. Although the details do not matter much, the specific task is to load a distributed Hazelcast map from some data files). The execution will be handled through a ThreadPoolExecutor. One thread will read the file then submit the data to multiple independent threads to put it in the map. The machine has 32 cores, so there are enough available for parallel loading of the map.
Because of the large number of requests, the common approach of creating tasks and queueing them to the executor service is not feasible as the queued tasks will take up too much memory.
Which brings the ExecutorCompletionService. With it, a task would be submitted when a previous operation completed, which is known by calling take()
(or poll()
, as applicable). This will work fine when all the threads of the executor service are used. However, "loading up all the threads" is not done yet. There are two phases:
fill up the queue: while there are still unused threads in the pool, submit tasks to the ExecutorCompletionService and do not wait until submitting more
feed the queue: once the threads are all used, submit a task only once a previous task has finished. Thus, rows will be fed as quickly as possible, but not quicker and will not be queued.
The above can be coded, but I was wondering if the above logic is already implemented and I somehow missed it. I'm asking because it looks to be a common scenario.
Upvotes: 2
Views: 1266
Reputation: 6276
If I understand your requirement, (Correct me if I am wrong) then you need a mechanism where there are several tasks and you need maximum n
tasks to perform in parallel and the other tasks should be in queue waiting, but once you submit a task then you do not want to hang around or keep the thread submitting task busy, and it can carry on with it's work
For the same scenario we use mixture of LinkedBlockingQueue
and the Thread
, I believe a simple function can help you in understanding,
private final LinkedBlockingQueue<YourTaskObjType> EnqueuedTasks;
private void initTasksProcessingThreads(int numberOfThreads)
{
EnqueuedTasks= new LinkedBlockingQueue<YourTaskObjType>();
for (int i = 0; i < numberOfThreads; i++)
{
// each thread will run forever and process incoming
//Change requests
Thread worker = new Thread(new Runnable()
{
public void run()
{
while (true)
{
try
{
YourTaskObjType task = EnqueuedTasks.take(); //This will wait infinitely until tasks are available
PerformTask(task); //Your function which will perform the task operation
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return;
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
});
worker.start();
}
}
Then you can use a simple function to add Tasks to the LinkedBlockingQueue
,
public void AddTask(YourTaskObjType TaskObj)
{
EnqueuedTasks.put(TaskObj);
}
Upvotes: -1
Reputation: 6718
You can specify a BlockingQueue
implementation when you create a ThreadPoolExecutor
. If all you're trying to avoid is creating excess Runnable
objects, then you could use a bounded BlockingQueue
, e.g. ArrayBlockingQueue
have a single thread pushing items on to the queue which will be blocked while the queue is at capacity.
Upvotes: 4