Reputation: 45
I have a data file with thousand of rows. I am reading them and saving them in the database. I want to multi-thread this process in the batches of say 50 rows. As I am read in the file, 10 rows are submitted to an ExecutorService.
ExecutorService executor = Executors.newFixedThreadPool(5);`
I can do below in a in a while loop till my rows end....
Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime);
But, I don't want to read the entire file into memory if the processing 10 rows is taking time. I only want to submit 5 till one of the threads return, then I submit the next.
Let's say a thread takes 20 seconds to save the 10 records, I don't want the ExecutorService
to be fed thousand of lines since the reading process is continuing to read and submit to ExecutorService
What is the best way to achieve this?
Upvotes: 4
Views: 541
Reputation: 4569
You can do this with a LinkedList<Future<?>>
that stores futures until you've reached some pre-determined size. Here's some skeleton code that should get you most of the way there:
int threads = 5;
ExecutorService service = Executors.newFixedThreadPool(threads);
LinkedList<Future<?>> futures = new LinkedList<>();
//As long as there are rows to save:
while(moreRowsLeft()){
//dump another callable onto the queue:
futures.addLast(service.submit(new RowSavingCallable());
//if the queue is "full", wait for the next one to finish before
//reading any more of the file:
while(futures.size() >= 2*threads) futures.removeFirst().get();
}
//All rows have been submitted but some may still be writing to the DB:
for(Future<?> f : futures) future.get();
//All rows have been saved at this point
You may wonder why I've allowed the number of futures to reach twice the number of threads on the machine - this allows executor service threads to be working on database saves while the main thread is creating more work to be done. This can help hide any I/O cost associated with making more callables available for processing while the worker threads are busy doing the Database write.
Upvotes: 3