PNS
PNS

Reputation: 19905

Multithreaded file processing and reporting

I have an application that processes data stored in a number of files from an input directory and then produces some output depending on that data.

So far, the application works in a sequential basis, i.e. it launches a "manager" thread that

I would like to convert this into a multithreaded application, in which the "manager" thread

The number of "processor" threads would be at most equal to the number of files, since they would be recycled via a ThreadPoolExecutor.

Any solution avoiding the use of join() or wait()/notify() would be preferrable.

Based on the above scenario:

  1. What would be the best way of having those "processor" threads reporting back to the "manager" thread? Would an implementation based on Callable and Future make sense here?
  2. How can the "manager" thread know when all "processor" threads are done, i.e. when all files have been processed?
  3. Is there a way of "timing" a processor thread and terminating it if it takes "too long" (i.e., it hasn't returned a result despite the lapse of a pre-configured amount of time)?

Any pointers to, or examples of, (pseudo-)source code would be greatly appreciated.

Upvotes: 3

Views: 1804

Answers (3)

Mike Deck
Mike Deck

Reputation: 18397

You can definitely do this without using join() or wait()/notify() yourself.

You should take a look at java.util.concurrent.ExecutorCompletionService to start with.

The way I see it you should write the following classes:

  • FileSummary - Simple value object that holds the result of processing a single file
  • FileProcessor implements Callable<FileSummary> - The strategy for converting a file into a FileSummary result
  • File Manager - The high level manager that creates FileProcessor instances, submits them to a work queue and then aggregates the results.

The FileManager would then look something like this:

class FileManager {
   private CompletionService<FileSummary> cs; // Initialize this in constructor

   public FinalResult processDir(File dir) {
      int fileCount = 0;
      for(File f : dir.listFiles()) {
         cs.submit(new FileProcessor(f));
         fileCount++;
      }

      for(int i = 0; i < fileCount; i++) {
         FileSummary summary = cs.take().get();
         // aggregate summary into final result;
      }
   }

If you want to implement a timeout you can use the poll() method on CompletionService instead of take().

Upvotes: 2

David Schwartz
David Schwartz

Reputation: 182829

  1. No need for them to report back. Just have a count of the number of jobs remaining to be done and have the thread decrement that count when it's done.

  2. When the count reaches zero of jobs remaining to be done, all the "processor" threads are done.

  3. Sure, just add that code to the thread. When it starts working, check the time and compute the stop time. Periodically (say when you go to read more from the file), check to see if it's past the stop time and, if so, stop.

Upvotes: 1

Alex D
Alex D

Reputation: 30465

wait()/notify() are very low level primitives and you are right in wanting to avoid them.

The simplest solution would be to use a thread-safe queues (or stacks, etc. -- it doesn't really matter in this case). Before starting the worker threads, your main thread can add all the Files to the thread-safe queue/stack. Then start the worker threads, and let them all pull Files and process them until there are none left.

The worker threads can add results to another thread-safe queue/stack, where the main thread can get them from. The main thread knows how many Files there were, so when it has retrieved the same number of results, it will know that the job is finished.

Something like a java.util.concurrent.BlockingQueue would work, and there are other thread-safe collections in java.util.concurrent which would also be fine.

You also asked about terminating worker threads which are taking too long. I will tell right up front: if you can make the code which runs on the worker threads robust enough that you can safely leave this feature out, you will make things a lot simpler.

If you do need this feature, the simplest and most reliable solution is to have a per-thread "terminate" flag, and make the worker task code check that flag frequently and exit if it is set. Make a custom class for workers, and include a volatile boolean field for this purpose. Also include a setter method (because of volatile, it doesn't need to be synchronized).

If a worker discovers that its "terminate" flag is set, it could push its File object back on the work queue/stack so another thread can process it. Of course, if there is some problem which means the File cannot be successfully processed, this will lead to an infinite cycle.

The best is to make the worker code very simple and robust, so you don't need to worry about it "not terminating".

Upvotes: 1

Related Questions