Noobie93
Noobie93

Reputation: 71

Improve Performance for reading file line by line and processing

I have a piece of java code which does the following -

  1. Opens a file with data in format {A,B,C} and each file has approx. 5000000 lines.
  2. For each line in file, call a service that gives a column D and append it to {A,B,C} as {A,B,C,D}.
  3. Write this entry into a chunkedwriter that eventually groups together 10000 lines to write back chunk to a remote location

Right now the code is taking 32 hours to execute. This process would again get repeated across another file which hypothetically takes another 32 hours but we need these processes to run daily.

Step 2 is further complicated by the fact that sometimes the service does not have D but is designed to fetch D from its super data store so it throws a transient exception asking you to wait. We have retries to handle this so an entry could technically be retried 5 times with a max delay of 60000 millis. So we could be looking at 5000000 * 5 in worst case.

The combination of {A,B,C} are unique and thus result D can't be cached and reused and a fresh request has to be made to get D every time.

I've tried adding threads like this:

temporaryFile = File.createTempFile(key, ".tmp");
Files.copy(stream, temporaryFile.toPath(), 
       StandardCopyOption.REPLACE_EXISTING);
reader = new BufferedReader(new InputStreamReader(new 
       FileInputStream(temporaryFile), StandardCharsets.UTF_8));
String entry;
while ((entry = reader.readLine()) != null) {
   final String finalEntry = entry;
   service.execute(() -> {
         try {
             processEntry(finalEntry);
         } catch (Exception e) {
             log.error("something");
   });
   count++;
 }

Here processEntry method abstracts the implementation details explained above and threads are defined as

ExecutorService service = Executors.newFixedThreadPool(10);

The problem I'm having is the first set of threads spin up but the process doesn't wait until all threads finish their work and all 5000000 lines are complete. So the task that used to wait for completion for 32 hours now ends in <1min which messes up our system's state. Are there any alternative ways to do this? How can I make process wait on all threads completing?

Upvotes: 0

Views: 361

Answers (3)

Gray
Gray

Reputation: 116908

The problem I'm having is the first set of threads spin up but the process doesn't wait until all threads finish their work and all 5000000 lines are complete.

When you are running jobs using an ExecutorService they are added into the service and are run in the background. To wait for them to complete you need to wait for the service to terminate:

ExecutorService service = Executors.newFixedThreadPool(10);
// submit jobs to the service here
// after the last job has been submitted, we immediately shutdown the service
service.shutdown();
// then we can wait for it to terminate as the jobs run in the background
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

Also, if there is a crap-ton of lines in these files, I would recommend that you use a bounded queue for the jobs so that you don't blow out memory effectively caching all of the lines in the file. This only works if the files stay around and don't go away.

// this is the same as a newFixedThreadPool(10) but with a queue of 100
ExecutorService service = new ThreadPoolExecutor(10, 10,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>(100));
// set a rejected execution handler so we block the caller once the queue is full
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Write this entry into a chunkedwriter that eventually groups together 10000 lines to write back chunk to a remote location

As each A,B,C job finishes, if it needs to be processed in a second step then I would also recommend looking into a ExecutorCompletionService which allows you to chain various different thread pools together so as lines finish they will immediately start working on the 2nd phase of the processing.

If instead this chunkedWriter is just a single thread then I'd recommend sharing a BlockingQueue<Result> and having the executor threads put to the queue once the lines are done and the chunkedWriter taking from the queue and doing the chunking and writing of the results. In this situation, indicating to the writer thread that it is done needs to be handled carefully -- maybe with some sort of END_RESULT constant put to the queue by the main thread waiting for the service to terminate.

Upvotes: 1

Amri Maher
Amri Maher

Reputation: 58

  • Think about using ExecutorCompletionService if you want to take tasks as they complete you need an ExecutorCompletionService. This acts as a BlockingQueue that will allow you to poll for tasks as and when they finish.
  • Another solution is to wait the executor termination then you shut it down using: ExecutorService service = Executors.newFixedThreadPool(10); service .shutdown(); while (!service .isTerminated()) {}

Upvotes: 1

Juan
Juan

Reputation: 5589

One alternative is to use a latch to wait for all the tasks to complete before you shutdown the executor on the main thread.

Initialize a CountdownLatch with 1.
After you exit the loop that submits the tasks, you call latch.await();

In the task you start you have to have a callback on the starting class to let it know when a task has finished.

Note that in the starting class the callback function has to be synchronized.

In the starting class you use this callback to take the count of completed tasks.

Also inside the callback, when all tasks have completed, you call latch.countdown() for the main thread to continue, lets say, shutting down the executor and exiting.

This shows the main concept, it can be implemented with more detail and more control on the completed tasks if necessary.

It would be something like this:

public class StartingClass {


    CountDownLatch latch = new CountDownLatch(1);

    ExecutorService service = Executors.newFixedThreadPool(10);
    BufferedReader reader;
    Path stream;
    int count = 0;
    int completed = 0;
    public void runTheProcess() {
        File temporaryFile = File.createTempFile(key, ".tmp");
        Files.copy(stream, temporaryFile.toPath(), 
               StandardCopyOption.REPLACE_EXISTING);
        reader = new BufferedReader(new InputStreamReader(new 
               FileInputStream(temporaryFile), StandardCharsets.UTF_8));
        String entry;
        while ((entry = reader.readLine()) != null) {
           final String finalEntry = entry;
           service.execute(new Task(this,finalEntry));
           count++;
        }
        latch.await();
        service.shutdown();
    }

    public synchronized void processEntry(String entry) {

    }

    public synchronized void taskCompleted() {
        completed++;
        if(completed == count) {
            latch.countDown();;
        }
    }

    //This can be put in a different file.
    public static class Task implements Runnable {
        StartingClass startingClass;
        String finalEntry;

        public Task(StartingClass startingClass, String finalEntry) {
            this.startingClass = startingClass;
            this.finalEntry = finalEntry;
        }

        @Override
        public void run() {
            try {
                startingClass.processEntry(finalEntry);
                startingClass.taskCompleted();
             } catch (Exception e) {
                 //log.error("something");
             }; 
        }

    }

}

Note that you need to close the file. Also the sutting down of the executor could be written to wait a few seconds before forcing a shutdown.

Upvotes: 1

Related Questions