Giovanny
Giovanny

Reputation: 45

java Multithreading - throttle submission to ExecutorService

I have a data file with thousand of rows. I am reading them and saving them in the database. I want to multi-thread this process in the batches of say 50 rows. As I am read in the file, 10 rows are submitted to an ExecutorService.

ExecutorService executor = Executors.newFixedThreadPool(5);`

I can do below in a in a while loop till my rows end....

 Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime);

But, I don't want to read the entire file into memory if the processing 10 rows is taking time. I only want to submit 5 till one of the threads return, then I submit the next.

Let's say a thread takes 20 seconds to save the 10 records, I don't want the ExecutorService to be fed thousand of lines since the reading process is continuing to read and submit to ExecutorService

What is the best way to achieve this?

Upvotes: 4

Views: 541

Answers (1)

CodeBlind
CodeBlind

Reputation: 4569

You can do this with a LinkedList<Future<?>> that stores futures until you've reached some pre-determined size. Here's some skeleton code that should get you most of the way there:

int threads = 5;
ExecutorService service = Executors.newFixedThreadPool(threads);
LinkedList<Future<?>> futures = new LinkedList<>();

//As long as there are rows to save:
while(moreRowsLeft()){
    //dump another callable onto the queue:
    futures.addLast(service.submit(new RowSavingCallable());

    //if the queue is "full", wait for the next one to finish before
    //reading any more of the file:
    while(futures.size() >= 2*threads) futures.removeFirst().get();
}

//All rows have been submitted but some may still be writing to the DB:
for(Future<?> f : futures) future.get();

//All rows have been saved at this point

You may wonder why I've allowed the number of futures to reach twice the number of threads on the machine - this allows executor service threads to be working on database saves while the main thread is creating more work to be done. This can help hide any I/O cost associated with making more callables available for processing while the worker threads are busy doing the Database write.

Upvotes: 3

Related Questions