Reputation: 987
I have n number of worker threads that retrieve records from a kinesis stream (this is not important for this problem), which are then pushed on to an executor service where the records are processed and persisted to a backend database. This same executor service instance is used for all worker threads.
Now there is a scenario where any given worker loop stops processing records and blocks until all records that were submitted by it are processed completely. This essentially means that there should be no pending/running threads in the executor service for a record from that particular worker thread.
A very trivial example of the implementation is like this:
Worker class
public class Worker {
Worker(Listener listener){
this.listener = listener;
}
//called periodically to fetch records from a kinesis stream
public void processRecords(Record records) {
for (Record record : records) {
listener.handleRecord(record);
}
//if 15 minutes has elapsed, run below code. This is blocking.
listener.blockTillAllRecordsAreProcessed()
}
}
Listener class
public class Listener {
ExecutorService es;
// same executor service is shared across all listeners.
Listener(ExecutorService es){
this.es = es;
}
public void handleRecord(Record record) {
//submit record to es and return
// non blocking
}
public boolean blockTillAllRecordsAreProcessed(){
// this should block until all records are processed
// no clue how to implement this with a common es
}
}
The only approach I could think of is to have a local executor service for each worker and do something like invokeAll
for each batch, which would be change the implementation slightly but get the job done. but I feel like there should be a better approach to tackle this problem.
Upvotes: 0
Views: 140
Reputation: 117
You could use the CountdownLatch class to block as follows:
public void processRecords(List<Record> records) {
CountDownLatch latch = new CountDownLatch(records.size());
for (Record record : records) {
listener.handleRecord(record, latch);
}
//if 15 minutes has elapsed, run below code. This is blocking.
listener.blockTillAllRecordsAreProcessed(latch)
}
public class Listener {
ExecutorService es;
...
public void handleRecord(Record record, CountDownLatch latch) {
//submit record to es and return
// non blocking
es.submit(()->{
someSyncTask(record);
latch.countDown();
})
}
public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
System.out.println("waiting for processes to complete....");
try {
//current thread will get notified if all chidren's are done
// and thread will resume from wait() mode.
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Read more here: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
Upvotes: 1