Reputation: 10565
I wrote a typical producer-consumer program: one producer, which is responsible for filling the queue; one consumer, which is responsible for dequeuing the queue. Below is my implementation.
The tricky part is that there is another sharing variable vFlag. the process(str) may change its value to true, which will be detected by the producer thread, and cause emptying the queue.
private class PQueue
{
private Queue<String> queue;
PQueue()
{
queue = new LinkedList<String>();
}
public synchronized void add(String str)
{
if(queue.size() > 1)
{
wait();
}
queue.add(token);
notify();
}
public synchronized String poll()
{
if(queue.size() == 0)
{
wait();
}
String str = queue.poll();
notify();
return str;
}
public synchronized void clear()
{
queue.clear();
}
}
PQueue queue = new PQueue();
private class Producer implements Runnable
{
public void run()
{
while (true) {
String str = read();
queue.add(str);
if(vFlag.value == false)
{
queue.clear();
vFlag.value = true;
}
if (str.equals("end"))
break;
}
exitFlag = true;
}
}
private class Consumer implements Runnable
{
public void run()
{
while(exitFlag == false)
{
String str = queue.poll();
process(str, vFlag);
}
}
}
Right now, there is still some bug. Could anyone give me some clue or suggestions in general, like a better concurrent programming philosophy or mechanism to avoid the concurrency bugs?
For above implementation, the program sometimes reports an err in process(str, vFlag):
Exception in thread "Thread-3" java.lang.NullPointerException
In other cases, the program runs normally. If I comment out the queue.clear(), it works well.
Upvotes: 0
Views: 724
Reputation: 116918
You need to make sure that the vFlag.value
field is volatile
since the boolean is being changed by multiple threads. Also, I assume that the producer once it sees the value
has been set to true
, drains the queue and then sets it to false
. That sounds like race conditions may result. You might consider using AtomicBoolean
instead so you can use compareAndSet(...)
to avoid overwrites.
I'm not sure this is a problem but you should consider doing something like the following to remove some of the set/unset race conditions. You don't need AtomicBoolean
for this:
while (!vFlag.value) {
// set to true immediately and then clear to avoid race
vFlag.value = true;
queue.clear();
}
You might also consider using a BlockingQueue
instead of synchronizing your own Queue
. BlockingQueue
s handle the thread-safety for you and also have a take()
method which does the wait()
logic for you. Your code seems to want to block if there is an item in your queue so you may want to use new LinkedBlockingQueue(1)
which limits the size of the queue to 1 which will block the put(...)
.
Lastly, it is always recommended in these cases to use while
loops instead of if
loops:
// always use while instead of if statements here
while (queue.size() == 0) {
wait();
}
This solves the problem of unexpected signals and multiple consumers or producers.
Upvotes: 2
Reputation: 171
For a generic thread safe queue you can use ConcurrentLinkedQueue or if you want to limit to one object in a queue at a time SynchronousQueue finally if one object at a time, but precalculate the next object can be done with ArrayBlockingQueue.
The Java memory model allows for effective caching of values, as well as reordering of instructions by a compiler assuming that reordering would not change the results if just one thread were running. As such the producer is not guaranteed to ever view vFlag as true, as it is allowed to use the cached value. Thus the consumer thread sees vflag as true, and the producer sees it as false. Declaring a variable as volatile means that JVM has to look up the value each time, and produces a memory fence between on either side of the access which prevents reordering. For instance
public class vFlagClass {
public volatile boolean value;
}
The consumer may continue to access the queue after it sets vFlag true, either the consumer should clear the queue itself, or it needs to wait for a signal from the producer that the queue is cleared. Even assuming value was volatile, it would still be acceptable operation for the program to consume all of the input in the queue after vFlag.value got set true, before activating the producer and clearing the queue.
Additionally your queue structure assumes only one thread is writing and one thread is reading, by having the producer clear the queue, the producer is effectively reading from the queue and breaking your invariant.
For your error specifically I imagine that what is happening is that the consumer tells the producer to clear the queue when the queue is empty and then tries the queue again, the queue is still empty so it waits. The producer then wakes up, fills the queue, this wakes up the consumer, the producer then clears the queue, the consumer starts going and polls the queue, however the queue is empty. And so NULL is returned. In other words
Producer : queue.add(str); // queue size is now 1
Producer : goes to end of loop
Consumer : String str = queue.poll(); // queue size is now 0
Consumer : process(str, vFlag); // vFlag.value is set to false
Consumer : String str = queue.poll(); // waits at
if(queue.size() == 0)
{
wait();
}
Producer : queue.add(str);
Producer : notify() // wakes up consumer
Producer : queue.clear();
Consumer : String str = queue.poll(); // note queue is empty, so returns NULL
In this case you would need to do change PQueue.poll to
String str = null;
while((str = queue.poll()) == null)
wait();
return str;
but this is not a good solution as it still has all of the other bugs mentioned earlier. Note: This only allows two threads to read from the queue, it does not protect against multiple threads writing to the queue.
Edit: Noticed another race condition, the Producer may put "end" into the queue, the Consumer read and process it, and then the Producer updates exitFlag. Note that changing exitFlag does not fix anything as it has nothing to do with reading stale data. Instead of using an exitFlag, you should have the Consumer also break when it reads "end".
See the documentation for more information on memory consistency in Java.
Upvotes: 2