Priyath Gregory
Priyath Gregory

Reputation: 987

Java - Concurrent processing with a common executor service instance

I have n number of worker threads that retrieve records from a kinesis stream (this is not important for this problem), which are then pushed on to an executor service where the records are processed and persisted to a backend database. This same executor service instance is used for all worker threads.

Now there is a scenario where any given worker loop stops processing records and blocks until all records that were submitted by it are processed completely. This essentially means that there should be no pending/running threads in the executor service for a record from that particular worker thread.

A very trivial example of the implementation is like this:

  1. Worker class

    public class Worker {
    
    Worker(Listener listener){
        this.listener = listener;
    }
    
    //called periodically to fetch records from a kinesis stream
    public void processRecords(Record records) {
    
        for (Record record : records) {
            listener.handleRecord(record);
        }
    
        //if 15 minutes has elapsed, run below code. This is blocking.
        listener.blockTillAllRecordsAreProcessed()
    }
    

    }

  2. Listener class

    public class Listener {
    
        ExecutorService es;
    
        // same executor service is shared across all listeners.
        Listener(ExecutorService es){
            this.es = es;
        }
    
        public void handleRecord(Record record) {
            //submit record to es and return
            // non blocking
        }
    
        public boolean blockTillAllRecordsAreProcessed(){
            // this should block until all records are processed
            // no clue how to implement this with a common es
        }
    
    }
    

The only approach I could think of is to have a local executor service for each worker and do something like invokeAll for each batch, which would be change the implementation slightly but get the job done. but I feel like there should be a better approach to tackle this problem.

Upvotes: 0

Views: 140

Answers (1)

Nikhil Joshi
Nikhil Joshi

Reputation: 117

You could use the CountdownLatch class to block as follows:

public void processRecords(List<Record> records) {
 CountDownLatch latch = new CountDownLatch(records.size());
 for (Record record : records) {
     listener.handleRecord(record, latch);
 }

 //if 15 minutes has elapsed, run below code. This is blocking.
 listener.blockTillAllRecordsAreProcessed(latch)
 } 

public class Listener {
 ExecutorService es;
 ...
 public void handleRecord(Record record, CountDownLatch latch) {
     //submit record to es and return
     // non blocking
     es.submit(()->{
        someSyncTask(record);
        latch.countDown();
        
    })
 }

 public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
     System.out.println("waiting for processes to complete....");
     try {
          //current thread will get notified if all chidren's are done 
          // and thread will resume from wait() mode.
          latch.await();
          } catch (InterruptedException e) {
               e.printStackTrace();
          }
       }

   }

Read more here: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html

Upvotes: 1

Related Questions