Denis Kulagin
Denis Kulagin

Reputation: 8917

Consumer/producer problem: pause production on slow consumption

I have a producer that reads blocks of text from the disk. Multiple consumers are doing computations on that blocks.

I would like producer to pause reading data from the disk if there are more that n blocks currently being computed over.

Have put it in pseudocode to illustrate what I would like to achieve.

// "produceBlocks" reads blocks from disk one by one
// and feeds them to lambda
produceBlocks(block -> {
  // (!) if activeCounter exceeds a THRESHOLD, then pause

  executorService.submit(() -> { 
     activeCounter.incrementAndGet();

     // do some work

     activeCounter.decrementAndGet();
  });
});

Upvotes: 3

Views: 56

Answers (2)

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13535

"I would like producer to pause reading data from the disk if there are more that n blocks currently being computed over." The real task descrition is slightly different: the producer, before reading data from the disk, should aquire a permit to do so. If your producer is a thread, then natural facility to manage permits is Semaphore. Initialy it contains n permits. The producer, to read a block, aguires 1 permit with Semaphore::aquire. When the block is processed by the consumer, consumer releases 1 permit with Semaphore::release.

Another approach is to combine blocks and permits. Similary to the output queue from producer to consumer, create an input blocking queue for blocks. Initualy put there n blocks. The producer, to read a block, first takes the next block from that queue. Consumer, after handling a block, returns it to the input queue.

Upvotes: 1

Peter Lawrey
Peter Lawrey

Reputation: 533580

I would use a fixed length queue for your thread pool and implement a RejectedExecuptionHandler to either run in the current thread or to pause and retry.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/RejectedExecutionHandler.html#rejectedExecution(java.lang.Runnable,%20java.util.concurrent.ThreadPoolExecutor)

e.g.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

This last option I have used effectively and it doesn't require extra code once the ExecutorService is configured.

Upvotes: 3

Related Questions