Reputation: 593
I am new to multi-threading in Java.
My goal is to have one thread reading a file and then passing work chunks off to worker threads to process in parallel.
There is a really good example here. Concurrency Tutorial
This code snippet takes a list of work (ArrayList<String> URLs
) and dumps it onto a block of worker threads with function designated in the Task.call() method.
void pingAndReportEachWhenKnown() throws InterruptedException, ExecutionException {
int numThreads = URLs.size() > 4 ? 4 : URLs.size(); //max 4 threads
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
CompletionService<PingResult> compService = new ExecutorCompletionService<>(executor);
for(String url : URLs){
Task task = new Task(url);
compService.submit(task);
}
for(String url : URLs){
Future<PingResult> future = compService.take();
log(future.get());
}
executor.shutdown(); //always reclaim resources
}
This is exactly what I want to do, but I need a change. The size of my work queue will not fit into working memory (HUGE file), so I need to buffer the read lines. I can achieve Blocking I need with an ArrayBlockingQueue. But then I need to also buffer the assignment of tasks to the CompletionService. Work chunk size will vary, so completion time will vary as well.
How do I not put too much on the compService work queue? The following code would put the items on one at a time, because it would wait for completion before trying to get another task from the queue. So this is insufficient. What is the right or best way to handle this?
for(;;){
Task task = arrayBlockingQueue.take(); //Blocking operation
compService.submit(task);
Future<PingResult> future = compService.take(); //Blocking operation
log(future.get());
}
Upvotes: 2
Views: 1266
Reputation: 27115
Instead of calling Executors.newFixedThreadPool(numThreads)
you can directly create a ThreadPoolExecutor
. One of the constructors for that class allows you to provide the queue that the thread pool will use.
So, provide it with a bounded queue (e.g., an ArrayBlockingQueue
with a fixed capacity): When the queue is full, your producer thread will be blocked, and it will stop reading the file until some of the work has been done.
John Vint said,
That unfortunately won't work. Once the queue is full and all threads busy a RejectedExecutionException will be thrown.
What if you use this constructor?
ExecutorService executorService = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit...,
new ArrayBlockingQueue<>(TASK_BACKLOG_LIMIT),
new ThreadPoolExecutor.CallerRunsPolicy()
);
The javadoc for ThreadPoolExecutor
talks about what happens when a task is rejected (e.g., because the queue is full). It says that the default behavior is to throw RejectedExecutionException
, but...
... Four predefined handler policies are provided ... (2) In
ThreadPoolExecutor.CallerRunsPolicy
, the thread that invokesexecute
itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted...
Upvotes: 2
Reputation: 5215
As far as I can tell, there's no compelling reason to differentiate work-producer and worker threads. Instead of using an ArrayBlockingQueue, you could simply make the read operation thread-safe.
public static class SynchronizedBufferedReader extends BufferedReader {
public SynchronizedBufferedReader(final Reader in) {
super(in);
}
@Override
public synchronized String readLine() throws IOException {
return super.readLine();
}
}
Instead of tasks producing one result each, each Task can take the same Reader, and do work in a for-loop until readLine
returns null. This way, you can create just as many Tasks as you have Threads, and all will remain busy.
Upvotes: 1
Reputation: 40256
I think what you're really looking for is a semaphore. You can acquire a resource and ensure the task will release it when done. That should give you the throttling you want.
If you use this with the CompletableFuture
you should have a bit cleaner code.
Semaphore semaphore = new Semaphore(NUMBER_OF_QUEUED_TASKS);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
for (String url : URLs) {
semaphore.acquire(); // if there have been too many requests
// queued wait until one is released
CompletableFuture
.supplyAsync(new Task(url), e)
.thenAccept(this::log)
.thenAccept((t) -> semaphore.relase(1));
}
e.shutdown();
...
public static class Task implements Supplier<PingResult> {
@Override
public PingResult get() {
}
}
Upvotes: 0
Reputation: 1577
I did something similar to this before, you can use a BufferedReader and send a StringBuffer to the thread for processing after a set threshold of lines read.
Another options is to split the file into multiple smaller files. Sending each file to a thread once it has been created.
Here is an example of the splitting a large file into multiple smaller files for processing. The Splitter is a class which just grabs a set amount of lines from the larger file to be placed into the smaller file.
private void execute() {
File segFile;
Splitter split = new Splitter(maxLines, file);
while ((segFile = split.nextSegment()) != null) {
String seg = segFile.getName().substring(segFile.getName().lastIndexOf("_")+1);
Runnable thread;
if (workflow.equals("Account")) {
thread = new AccountThread(segFile);
}
else {
thread = new CustomerThread(segFile);
}
pool.execute(thread);
}
pool.shutdown();
while (!pool.isTerminated()) {}
}
Upvotes: 1