Reputation: 86845
I want to read a large file, process each line and insert the results into a database. My goal is to parallelize the processing of the lines, as each process is a longrunning task. Therefore I want one thread to keep reading, multiple threads to keep processing, and one thread keep inserting in chunks to db.
I broke it down as follows:
1) read a file line by line sequentially (easy)
2) send each line to a threadpool (3 threads), as the processing is the long-running task. block further line reading while threadpool is busy.
3) write each processed line from each theadpool to StringBuffer
4) monitor that buffer size, and write the results in chunks to a database (eg each 1000 entries)
ExecutorService executor = Executors.newFixedThreadPool(3);
StringBuffer sb = new StringBuffer();
String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
count.getAndIncrement();
Future<String> future = executor.submit(() -> {
return processor.process(line);
});
//PROBLEM: this blocks until the future returns
sb.append(future.get());
if (count.get() == 100) {
bufferChunk = sb;
count = new AtomicInteger(0);
sb = new StringBuffer();
databaseService.batchInsert(bufferChunk.toString());
}
}
Problems:
future.get()
will always block the reader until one future returns a result
the buffer "monitoring" is probably not done right
Probably I'm not doing this the right way. But how can I achieve this?
Sidenote: filesize is about 10GB, so I cannot first read the entire file into memory to prepare the parallel tasks.
Upvotes: 8
Views: 2948
Reputation: 86845
After deeper research, I found the BlockingExecutor
presented in this answer comes closest to what I'm trying to achieve:
https://stackoverflow.com/a/43109689/1194415
It basically extends ThreadPoolExecutor
combined with a Semaphore
lock.
Upvotes: 0
Reputation: 10082
I find the following solution elegant. It is only one of the many possible, but it is conceptually simple and
I am only putting the actual test method here with the complete test setup and auxiliary data structures available in a dedicated GitHub repo:
private final AtomicInteger count = new AtomicInteger();
private final Consumer<String> processor = (value) -> {
count.incrementAndGet();
};
@Test
public void onlyReadWhenExecutorAvailable() throws Exception {
Executor executor = Executors.newCachedThreadPool();
CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
String value = reader.read();
if (value == null) {
break;
}
semaphore.acquire();
CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
.thenAcceptAsync(v -> {
processor.accept(v);
semaphore.release();
}, executor);
done = done.thenCompose($ -> future);
}
done.get();
assertEquals(ENTRIES, count.get());
}
Upvotes: 2
Reputation: 99
Upvotes: 0