Reputation:
In the following scenario, the finalizer thread must wait for the consumer thread to process all queue elements in order to finish execution:
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private final Object queueMonitor = new Object();
// Consumer thread
while (true) {
Object element = queue.take();
consume(element);
synchronized (queueMonitor) {
queueMonitor.notifyAll();
}
}
// Finalizer thread
synchronized (queueMonitor) {
while (!queue.isEmpty()) {
queueMonitor.wait();
}
}
Elements are added to the queue over time. The consumer daemon thread runs all the time until the JVM terminates, at which point it must be allowed to complete the processing of all queued elements. Currently this is accomplished by the finalizer thread, which is a shutdown hook that should delay the killing of the consumer thread on JVM termination.
Problem:
If the finalizer thread is started after the last element has been taken out from the queue, then the while loop condition evaluates to false
, so execution completes while consume()
has not returned yet because waiting on queueMonitor
is skipped completely.
Research:
An ideal solution would be to peek the queue, and then remove the element after it has been consumed.
Upvotes: 1
Views: 1320
Reputation: 448
One approach could be you use a CountDownLatch
– have the finalizer block on it and the consumer call countdown after consume()
.
Basically don't block on the queue, block on task completion.
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private volatile boolean running = true;
private final CountDownLatch terminationLatch = new CountDownLatch(1);
// Consumer thread
while (running || !queue.isEmpty()) {
Object element = queue.poll(100, TimeUnit.MILLISECONDS);
if (element == null) continue;
consume(element);
}
terminationLatch.countDown();
// Finalizer thread
running = false;
terminationLatch.await();
Upvotes: 3