Reputation: 1354
I am trying to write a Producer Consumer code. Below is the original code I wrote.
Stack<Integer> buffer = new Stack<>();
volatile int i = 1;
class Consumer implements Runnable {
@Override
public void run() {
while(true){
synchronized (buffer) {
System.out.println("Consumer taking lock : " + Thread.currentThread().getName());
while(buffer.isEmpty()){
try{
System.out.println("Consumer releasing lock :" + Thread.currentThread().getName());
buffer.wait();
System.out.println("Consumer woken up :" + Thread.currentThread().getName());
} catch(InterruptedException ie){
ie.printStackTrace();
}
}
System.out.println(buffer.pop());
buffer.notify();
}
}
}
}
class Producer implements Runnable {
@Override
public void run() {
while(true){
synchronized (buffer) {
System.out.println("Producer taking lock : " + Thread.currentThread().getName());
while(!buffer.isEmpty()){
try {
System.out.println("Producer going into wait set :" + Thread.currentThread().getName());
buffer.wait();
System.out.println("Producer woken up :" + Thread.currentThread().getName());
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
buffer.push(i);
i++;
buffer.notify();
}
}
}
}
public static void main(String[] args) {
ProducerConsumerUnitBuffer obj = new ProducerConsumerUnitBuffer();
Thread producerThread1 = new Thread(obj.new Consumer());
Thread consumerThread1 = new Thread(obj.new Producer());
Thread producerThread2 = new Thread(obj.new Consumer());
Thread consumerThread2 = new Thread(obj.new Producer());
Thread producerThread3 = new Thread(obj.new Consumer());
Thread consumerThread3 = new Thread(obj.new Producer());
Thread producerThread4 = new Thread(obj.new Consumer());
Thread consumerThread4 = new Thread(obj.new Producer());
Thread producerThread5 = new Thread(obj.new Consumer());
Thread consumerThread5 = new Thread(obj.new Producer());
Thread producerThread6 = new Thread(obj.new Consumer());
Thread consumerThread6 = new Thread(obj.new Producer());
Thread producerThread7 = new Thread(obj.new Consumer());
Thread consumerThread7 = new Thread(obj.new Producer());
Thread producerThread8 = new Thread(obj.new Consumer());
Thread consumerThread8 = new Thread(obj.new Producer());
Thread producerThread9 = new Thread(obj.new Consumer());
Thread consumerThread9 = new Thread(obj.new Producer());
Thread producerThread10 = new Thread(obj.new Consumer());
Thread consumerThread10 = new Thread(obj.new Producer());
producerThread1.start();
consumerThread1.start();
producerThread2.start();
consumerThread2.start();
producerThread3.start();
consumerThread3.start();
producerThread4.start();
consumerThread4.start();
producerThread5.start();
consumerThread5.start();
producerThread6.start();
consumerThread6.start();
producerThread7.start();
consumerThread7.start();
producerThread8.start();
consumerThread8.start();
producerThread9.start();
consumerThread9.start();
producerThread10.start();
consumerThread10.start();
}
This code always stalls. Although the application doesn't terminate, it stops printing anything which means no thread is entering the synchronized block.
Although, when I use notifyAll() instead of notify() the code works perfectly fine.
EDIT
As per suggestions, I tried changing the code so that there are 2 separate objects for producers and consumers to take a lock on. Also, the producers notify() the consumers when an object is put in the buffer.
public class ProducerConsumerDifferentObjects {
Stack<Integer> buffer = new Stack<>();
Boolean producerLockingObject = Boolean.FALSE;
Boolean consumerLockingObject = Boolean.TRUE;
volatile int i = 1;
class Consumer implements Runnable {
@Override
public void run() {
while (true) {
synchronized (consumerLockingObject) {
while (buffer.isEmpty()) {
try {
consumerLockingObject.wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
System.out.println(buffer.pop());
consumerLockingObject.notify();
}
}
}
}
class Producer implements Runnable {
@Override
public void run() {
while (true) {
synchronized (producerLockingObject) {
while (!buffer.isEmpty()) {
try {
producerLockingObject.wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
buffer.push(i);
i++;
producerLockingObject.notify();
synchronized (consumerLockingObject) {
consumerLockingObject.notify();
}
}
}
}
}
public static void main(String[] args) {
ProducerConsumerDifferentObjects obj = new ProducerConsumerDifferentObjects();
Thread producerThread1 = new Thread(obj.new Consumer());
Thread consumerThread1 = new Thread(obj.new Producer());
Thread producerThread2 = new Thread(obj.new Consumer());
Thread consumerThread2 = new Thread(obj.new Producer());
Thread producerThread3 = new Thread(obj.new Consumer());
Thread consumerThread3 = new Thread(obj.new Producer());
Thread producerThread4 = new Thread(obj.new Consumer());
Thread consumerThread4 = new Thread(obj.new Producer());
Thread producerThread5 = new Thread(obj.new Consumer());
Thread consumerThread5 = new Thread(obj.new Producer());
Thread producerThread6 = new Thread(obj.new Consumer());
Thread consumerThread6 = new Thread(obj.new Producer());
Thread producerThread7 = new Thread(obj.new Consumer());
Thread consumerThread7 = new Thread(obj.new Producer());
Thread producerThread8 = new Thread(obj.new Consumer());
Thread consumerThread8 = new Thread(obj.new Producer());
Thread producerThread9 = new Thread(obj.new Consumer());
Thread consumerThread9 = new Thread(obj.new Producer());
Thread producerThread10 = new Thread(obj.new Consumer());
Thread consumerThread10 = new Thread(obj.new Producer());
producerThread1.start();
consumerThread1.start();
producerThread2.start();
consumerThread2.start();
producerThread3.start();
consumerThread3.start();
producerThread4.start();
consumerThread4.start();
producerThread5.start();
consumerThread5.start();
producerThread6.start();
consumerThread6.start();
producerThread7.start();
consumerThread7.start();
producerThread8.start();
consumerThread8.start();
producerThread9.start();
consumerThread9.start();
producerThread10.start();
consumerThread10.start();
}
}
Upvotes: 0
Views: 324
Reputation: 65898
In the multiple producer/multiple consumer (MPMC) scenario you are trying to use single object (buffer
) for notify both consumers and producers. This is why you got stalling eventually: instead of notifying a producer, one consumer notifies another consumer. (Or producers notifiers another producer).
Consider the following possible ordering of events:
So all consumers and producers are in a waiting state.
Having more consumers and/or producers doesn't help: it still possible to all of them to be in a waiting state.
Possible ways for resolve the problem:
Having different objects for notify consumers and producers. In Java, you may create two Condition objects from the single lock, one condition for notify consumers, and another - for notify producers.
Additionally synchronize producers with themselves, and consumers with themselves. So at most single producer and single consumer may wait on the common object.
Both approaches eliminates possibility of consumer-consumer or producer-producer notification.
It could be that changing wait condition for producers from "the buffer is non-empty" to "the buffer is full" will help in case of buffer's capacity of 2 elements or more. consumer-consumer notification is still possible, but may be complete stalling would be avoided. But this would be very tricky.
Upvotes: 2