user979899
user979899

Reputation: 153

Java task queue, threadpool and threads with callback to inform when a new task can start

I have tried to find a way how to load several web pages in multiple threads with a certain maximum limit of threads, in a way such that a new page is loaded when one finish. There should also be another post-processing threads for the loaded content after the page is downloaded so that the whole process is chained.

How I'd like to make it:

I have something like:

            for (int j = 0; j < threads.length; j++) {
            threads[j].start();
        }

        for (int j = 0; j < threads.length; j++) {
            threads[j].join();
        }

But this way all pages to load are in separate threads at the same time and I want to limit the number of threads. More importantly I want to reuse the threads and make a thread do the next task when one task is finished. I could do that with a while loop, but this is what I'm trying to avoid, I don't want a while loop to check if the queue has more tasks and if a thread is free. Is it possible to use some kind of callback, so that the thread tells back to the pool that is is completed and returns the data. I also want the downloading tasks to ~store the content in a data structure and add it futher to the post-processing task queue.

The best resources I found so far is: Thread pools Callback

But I don't know if it even is possible to create it the way I want. I' stuck in thinking about function pointers.

Upvotes: 2

Views: 6379

Answers (2)

sanjary
sanjary

Reputation: 1206

You can use Guava's ListenableFutures.

First you need to submit download tasks to the ListenableExecutorService, then transform resulting futures with post processor via Futures.transform.

ListenableExecutorService dlPool = MoreExecutors.listeningDecorator(firstPool);
ListenableExecutorService procPool = MoreExecutors.listeningDecorator(secondPool);

List<ListenableFuture<Result>> results = new ArrayList<...>();
for (String url : urls) {
  // download task
  ListenableFuture<String> html = dlPool.submit(...);
  // post process
  ListenableFuture<Result> result = Futures.transform(html,
    new Function<String, Result>() {
      ... // post process
    }, procPool);
  results.add(result);
}

// blocks until all results are processed
List<Result> processed = Futures.allAsList(results).get();

firstPool.shutdownNow();
secondPool.shutdownNow();

Upvotes: 1

JB Nizet
JB Nizet

Reputation: 692081

Don't use low-level thread methods to do that. Have a downloadExecutor thread pool, and submit DownloadTask instances (implementing Runnable or Callable) to this pool.

At the end of the DownloadTask's code, submit the a PostProcessPageTask instance (once again implementing implementing Runnable or Callable) to a second postProcessExecutor thread pool.

You could use one or two CountDownLatch instances that every task would decrement when finished, and have the main thread awaiting on this (or these) latches to know when the thread pools must be shut down.

See http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html and docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html for more information.

Upvotes: 2

Related Questions