Reputation: 2011
I have N workers that share a queue of elements to compute. At each iteration, each worker removes an element from the queue and can produce more elements to compute, that are going to be put in the same queue. Basically, each producer is also a consumer. The computation finishes when there are no elements on the queue and all the workers have finished computing the current element (hence no more elements to compute can be produced). I want to avoid a dispatcher/coordinator, so the workers should coordinate. What is the best pattern to allow a worker to find out if the halting conditions is valid, and hence halt the computation on behalf of the others?
For example, if all the threads would just do this cycle, when the elements are all computed, it would result in all the threads being blocked eternally:
while (true) {
element = queue.poll();
newElements[] = compute(element);
if (newElements.length > 0) {
queue.addAll(newElements);
}
}
Upvotes: 5
Views: 878
Reputation: 18148
Maintain a count of active threads.
public class ThreadCounter {
public static final AtomicInteger threadCounter = new AtomicInteger(N);
public static final AtomicInteger queueCounter = new AtomicInteger(0);
public static final Object poisonPill = new Object();
public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}
Your threads' polling loop should look like the following (I'm assuming that you're using a BlockingQueue
)
while(!ThreadCounter.cancel) {
int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
ThreadCounter.cancel = true;
queue.offer(ThreadCounter.poisonPill);
} else {
Object obj = queue.take();
ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
ThreadCounter.queueCounter.decrementAndGet();
if(obj == ThreadCounter.poisonPill) {
queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
continue;
}
}
}
If a thread is about to block on the BlockingQueue
then it decrements the counter; if all threads are already waiting on the queue (meaning that counter == 0
), then the last thread sets cancel
to true, then sends a poison pill through the queue to wake up the other threads; each thread sees the poison pill, sends it back through the queue to wake up the remaining threads, and then exits the loop when it sees that cancel
is set to true.
Edit: I've removed the data race by adding a queueCounter
that maintains a count of the number of objects in the queue (obviously you'll also need to add a queueCounter.incrementAndGet()
call to wherever you're adding objects to the queue). This works as follows: if threadCount == 0
, but queueCount != 0
, then this means that a thread has just removed an item from the queue but has not yet called threadCount.getAndIncrement
, and so the cancel variable is not set to true. It's important that the threadCount.getAndIncrement
call precede the queueCount.getAndDecrement
call, otherwise you'll still have a data race. It shouldn't matter in what order you call queueCount.getAndIncrement
since you won't be interleaving this with a call to threadCount.getAndDecrement
(the latter will be called at the end of the loop, the former will be called at the beginning of the loop).
Note that you can't just use a queueCount
to determine when to end the process, because a thread might still be active without yet having placed any data in the queue - in other words, queueCount
would be zero, but would be non-zero once the thread finished its current iteration.
Instead of repeatedly sending the poisonPill
through the queue, you can instead have the canceling thread send (N-1) poisonPills
through the queue. Just be cautious if you use this approach using a different queue, because some queues (e.g. Amazon's Simple Queue Service) may return multiple items on the equivalent of their take
methods, in which case you'll need to repeatedly send through the poisonPill
to ensure that everything shuts down.
Additionally, instead of using a while(!cancel)
loop, you can use a while(true)
loop and break when the loop detects a poisonPill
Upvotes: 6