Reputation: 153
I have tried to find a way how to load several web pages in multiple threads with a certain maximum limit of threads, in a way such that a new page is loaded when one finish. There should also be another post-processing threads for the loaded content after the page is downloaded so that the whole process is chained.
How I'd like to make it:
Task queue holds the pages that should be downloaded
Threadpool has a certain number of threads downloading the pages in the task queue (loading the pages take some time so the number of threads can be much higher than the number of the cpu cores)
When a page's download is completed, the thread should notify this so that a new task from the queue can be started instead
When a page's download is completed, the content should be transferred to another task queue for the post-processing
Another threadpool has as many threads as the cpu has cores (guess it is fastest to have only one thread per core for the post-processing), this threadpool perform post-processing on the downloaded pages.
When a page's post-processing is completed, the thread should notify it so that another page in the queue can be post-processed
When all pages has been downloaded (the queue is empty), the first threadpool can be shutdown, the other threadpool can be shutdown when both task queues are empty (all pages has been downloaded and post-processed)
I have something like:
for (int j = 0; j < threads.length; j++) {
threads[j].start();
}
for (int j = 0; j < threads.length; j++) {
threads[j].join();
}
But this way all pages to load are in separate threads at the same time and I want to limit the number of threads. More importantly I want to reuse the threads and make a thread do the next task when one task is finished. I could do that with a while loop, but this is what I'm trying to avoid, I don't want a while loop to check if the queue has more tasks and if a thread is free. Is it possible to use some kind of callback, so that the thread tells back to the pool that is is completed and returns the data. I also want the downloading tasks to ~store the content in a data structure and add it futher to the post-processing task queue.
The best resources I found so far is: Thread pools Callback
But I don't know if it even is possible to create it the way I want. I' stuck in thinking about function pointers.
Upvotes: 2
Views: 6379
Reputation: 1206
You can use Guava's ListenableFutures.
First you need to submit download tasks to the ListenableExecutorService, then transform resulting futures with post processor via Futures.transform.
ListenableExecutorService dlPool = MoreExecutors.listeningDecorator(firstPool);
ListenableExecutorService procPool = MoreExecutors.listeningDecorator(secondPool);
List<ListenableFuture<Result>> results = new ArrayList<...>();
for (String url : urls) {
// download task
ListenableFuture<String> html = dlPool.submit(...);
// post process
ListenableFuture<Result> result = Futures.transform(html,
new Function<String, Result>() {
... // post process
}, procPool);
results.add(result);
}
// blocks until all results are processed
List<Result> processed = Futures.allAsList(results).get();
firstPool.shutdownNow();
secondPool.shutdownNow();
Upvotes: 1
Reputation: 692081
Don't use low-level thread methods to do that. Have a downloadExecutor
thread pool, and submit DownloadTask
instances (implementing Runnable
or Callable
) to this pool.
At the end of the DownloadTask's code, submit the a PostProcessPageTask instance (once again implementing implementing Runnable
or Callable
) to a second postProcessExecutor
thread pool.
You could use one or two CountDownLatch instances that every task would decrement when finished, and have the main thread awaiting on this (or these) latches to know when the thread pools must be shut down.
See http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html and docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html for more information.
Upvotes: 2