Reputation: 6409
This is an implementation of producer consumer pattern for a homework. What's wrong with the below implementation. I have googled for various implementations, but I am not able to understand what went wrong in mine.
I have a shared queue
I synchronize the producer and consumer on the same lock
Implementation
Shared Queue:
class SharedQueue{
public static Queue<Integer> queue = new LinkedList<Integer>();
}
Producer Thread :
//The producer thread
class Producer implements Runnable{
public void run()
{
synchronized (SharedQueue.queue)
{
if(SharedQueue.queue.size() >=5)
{
try {
SharedQueue.queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Random r = new Random();
int x = r.nextInt(10);
System.out.println("Inside Producer" + x);
SharedQueue.queue.offer(x);
SharedQueue.queue.notify();
}
}
}
Consumer Thread:
class Consumer implements Runnable{
public void run()
{
synchronized (SharedQueue.queue)
{
if(SharedQueue.queue.size() == 0)
{
try {
SharedQueue.queue.wait();
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
int k = SharedQueue.queue.remove();
System.out.println("Inside consumer" + k);
}
}
}
The Main program
public class ProducerConsumerTest {
public static void main(String[] args)
{
Thread p = new Thread(new Producer());
Thread q = new Thread(new Consumer());
p.start();
q.start();
}
}
Upvotes: 2
Views: 4391
Reputation: 5237
Simply use my pattern with poison pills:
public sealed interface BaseMessage {
final class ValidMessage<T> implements BaseMessage {
@Nonnull
private final T value;
public ValidMessage(@Nonnull T value) {
this.value = value;
}
@Nonnull
public T getValue() {
return value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ValidMessage<?> that = (ValidMessage<?>) o;
return value.equals(that.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
@Override
public String toString() {
return "ValidMessage{value=%s}".formatted(value);
}
}
final class PoisonedMessage implements BaseMessage {
public static final PoisonedMessage INSTANCE = new PoisonedMessage();
private PoisonedMessage() {
}
@Override
public String toString() {
return "PoisonedMessage{}";
}
}
}
public class Producer implements Callable<Void> {
@Nonnull
private final BlockingQueue<BaseMessage> messages;
Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
this.messages = messages;
}
@Override
public Void call() throws Exception {
messages.put(new BaseMessage.ValidMessage<>(1));
messages.put(new BaseMessage.ValidMessage<>(2));
messages.put(new BaseMessage.ValidMessage<>(3));
messages.put(BaseMessage.PoisonedMessage.INSTANCE);
return null;
}
}
public class Consumer implements Callable<Void> {
@Nonnull
private final BlockingQueue<BaseMessage> messages;
private final int maxPoisons;
public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
this.messages = messages;
this.maxPoisons = maxPoisons;
}
@Override
public Void call() throws Exception {
int poisonsReceived = 0;
while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
BaseMessage message = messages.take();
if (message instanceof BaseMessage.ValidMessage<?> vm) {
Integer value = (Integer) vm.getValue();
System.out.println(value);
} else if (message instanceof BaseMessage.PoisonedMessage) {
++poisonsReceived;
} else {
throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
}
}
return null;
}
}
Upvotes: 0
Reputation: 1
public class ProducerAndConsumer {
public static void main(String a[]) {
Resource resource = new Resource();
Producer producer = new Producer(resource);
Consumer consumer = new Consumer(resource);
producer.start();
consumer.start();
}
}
class Resource {
private int item = 0;
boolean flag = true;
public void getItem() {
while (true) {
synchronized (this) {
if (!flag) {
try {
System.out.println("Consumer consume item :" + item);
flag = true;
Thread.sleep(10);
notify();
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
public void setItem() {
while (true) {
synchronized (this) {
if (flag) {
try {
item++;
System.out.println("Producer creating item :" + item);
flag = false;
Thread.sleep(10);
notify();
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
}
class Producer extends Thread {
Resource resource = null;
Producer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
resource.setItem();
}
}
class Consumer extends Thread {
Resource resource = null;
Consumer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
resource.getItem();
}
}
Upvotes: 0
Reputation: 9
You can use ConcurrentLinkedQueue to manage shared queue for the Producer and Consumer. You can make use of ConcurrentHashMap> collection, which will help Producer to produce concurrently and also Consumer can consume concurrently and keep the generated keys by the Producer in another collection object, where Consumer can find its key and consume it from the ConcurrentHashMap>.
Upvotes: 0
Reputation: 164
Easy way to implement Producer Consumer Problem is by using semaphore.
public class Semaphore {
int value;
public Semaphore(int intialValue) {
this.value = intialValue;
}
public synchronized void p() {
while (value <= 0) {
try {
this.wait();
} catch (InterruptedException e) {
}
}
value = value - 1;
}
public synchronized void v() {
value = value + 1;
this.notify();
}
}
public class ProducerConsumerUsingSemaphore {
private static final int SIZE = 10;
public static void main(String[] args) {
Semaphore full = new Semaphore(0);
Semaphore empty = new Semaphore(SIZE);
Semaphore mutex = new Semaphore(1);
Vector<Integer> sQueue = new Vector<Integer>();
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5000; i++) {
empty.p();
mutex.p();
System.out.println(Thread.currentThread().getName() + " is trying to insert item " + i);
sQueue.add(i);
mutex.v();
full.v();
}
}
});
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
full.p();
mutex.p();
System.out.println(Thread.currentThread().getName() + " consuming item " + sQueue.remove(0));
mutex.v();
empty.v();
}
}
});
producerThread.setName("Producer");
consumerThread.setName("Consumer");
consumerThread.start();
producerThread.start();
}
}
Upvotes: 1
Reputation: 311
I am assuming that you want this to be a endless loop of producer consumer. On top of Eng.Fouad changes, suround both synchonized blocks with:
while (true)
and in the Consumer add a notify
int k = SharedQueue.queue.remove();
// make the producer active again
SharedQueue.queue.notify();
System.out.println("Inside consumer " + k);
Upvotes: 1
Reputation: 117685
Try replacing:
if(SharedQueue.queue.size() >= 5)
with:
while(SharedQueue.queue.size() >= 5)
and this:
if(SharedQueue.queue.size() == 0)
with:
while(SharedQueue.queue.size() == 0)
Just to re-check the condition after calling notify()
.
Upvotes: 4