serah
serah

Reputation: 2117

Issue with Producer Consumer in Java - Missing synchronization?

I guess I have not used synchronization properly. I get the below output .

I have consciously chosen not to use BlockingQueue OR java 5 concurrency features. I had written this so that I could learn synchronization and some basics.

Producer Thread: PRODUCER-1 adding item 0-Name-0 to queue
Consumer Thread CONSUMER-2 processing item: 0-Name-0
Producer Thread: PRODUCER-2 adding item 1-Name-1 to queue

Can you please help me understand where I am going wrong?

public class ProducerConsumerManager {

public static void main(String args[]){

    ItemQueue itemQueue = new ItemQueue();

    Producer producer1 = new Producer(itemQueue,15, 500);
    Producer producer2 = new Producer(itemQueue,15, 1000);
    Consumer consumer1 = new Consumer(itemQueue,500);
    Consumer consumer2 = new Consumer(itemQueue,1500);

    Thread producerThread1 = new Thread(producer1,"PRODUCER-1");
    Thread producerThread2 = new Thread(producer2,"PRODUCER-2");
    Thread consumerThread1 = new Thread(consumer1,"CONSUMER-1");
    Thread consumerThread2 = new Thread(consumer2,"CONSUMER-2");

    producerThread1.start();
    producerThread2.start();

    consumerThread1.start();
    consumerThread2.start();


    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        System.out.println("The MAIN THREAD has been INTERRUPTED");
    }


}
}


 public class Consumer implements Runnable{

private ItemQueue itemQueue;
private int waitTimeInMillis;
public Consumer(ItemQueue queue, int waitTimeInMillis){
    itemQueue = queue;
    this.waitTimeInMillis = waitTimeInMillis;
}

private boolean processItem(Item item){     
    if(item == null){
        System.out.println("Consumer Thread cannot process as Item is null");
        return false;
    }               
    return true;
}

public void run() {
    synchronized(itemQueue){
        try {
        if(itemQueue.hasMoreItems()){
            Item item = itemQueue.getNextItem();
            System.out.println("Consumer Thread "+ Thread.currentThread().getName() + " processing item: " +  item.getItemNo() + "-" + item.getItemName());

            processItem(item);              
                Thread.sleep(waitTimeInMillis);

        }else{

                itemQueue.wait();
            }} catch (InterruptedException e) {
                System.out.println("Consumer Thread INTERRUPTED");                  
            }

    }               
}

}


  public class Producer implements Runnable{

private ItemQueue itemQueue;
private int maxCount;
private int waitTimeInMillis;
public Producer(ItemQueue queue, int maxItems, int waitTimeInMillis){
    itemQueue = queue;  
    this.maxCount = maxItems;
    this.waitTimeInMillis = waitTimeInMillis;
}

public void run() { 
    synchronized(itemQueue){
        try {
        if(itemQueue.queueCount()>=maxCount){

                itemQueue.wait();               
        }
        else{
            produceNewItem();
            Thread.sleep(waitTimeInMillis);
        }
        } catch (InterruptedException e) {
            System.out.println("Producer Thread INTERRUPTED");
        }
    }       
}

private boolean produceNewItem(){
    Item  item = null;
    synchronized(ItemService.class){
        item = ItemService.getNextItem();       
    System.out.println("Producer Thread: " + Thread.currentThread().getName() + " adding item " + item.getItemNo() +"-"+item.getItemName()+"  to queue");
    itemQueue.addItem(item);
    return true;
}
}
}


  import java.util.LinkedList;

  public class ItemQueue {

private LinkedList<Item> itemList = new LinkedList<Item>();

public void addItem(Item item){
    itemList.add(item);
}

public Item getNextItem(){
    return itemList.poll();
}

public boolean hasMoreItems(){
    return  !itemList.isEmpty();
}

public int queueCount(){
    return itemList.size();
}
}


   public class Item {

private String itemName;
private int itemNo;
private String itemDescription;

public String getItemName() {
    return itemName;
}
public void setItemName(String itemName) {
    this.itemName = itemName;
}
public int getItemNo() {
    return itemNo;
}
public void setItemNo(int itemNo) {
    this.itemNo = itemNo;
}
public String getItemDescription() {
    return itemDescription;
}
public void setItemDescription(String itemDescription) {
    this.itemDescription = itemDescription;
}

public Item (int no, String name, String desc){
    itemName = name;
    itemNo = no;
    itemDescription = desc;
}
}


   import java.util.LinkedList;

  public class ItemService {

static LinkedList<Item> itemList = new LinkedList<Item>();
static int counter =0;

static{
    Item item = null;
    for(int i=0;i<10000;i++){
        item = new Item(i, "Name-"+i, "Description for item " + i);
        itemList.add(item);
    }

}

public static Item getNextItem(){
    if(counter < 9999){
        Item item= itemList.get(counter);
        counter++;
        return item;
    }
    else
    {
        System.out.println("Cannot PRODUCE any further items. all exhausted");
        return null;
    }

}

}

Upvotes: 0

Views: 651

Answers (2)

magulla
magulla

Reputation: 509

One of the points you are wrong at (maybe not the reason of your problem) is that in Producer/Consumer model you should start/run your consumers before producers.

Upvotes: 0

JB Nizet
JB Nizet

Reputation: 691775

You've still not said which issue you have, i.e. what you expect instead of the output you get, but there are two important problems with your code:

  1. Both kinds of threads wait on the itemQueue (becauses it's full, or because it's empty), but nowhere in the code is notify(), or preferrably, notifyAll(), called to wake up the waiting threads. This will inevitably lead to starvation. When a producer puts an item in the queue, it should call notifyAll() to wake up the waiting consumers. When a consumer removes an item from the queue, it should call notifyAll() to wake up the waiting producers.
  2. A wait() method must always be made inside a loop that checks whether the thread can really continue when it's woken up. Read the javadoc for Object.wait().

Another, less important problem, is that rather to force each thread to implement synchronization and wait()/notifyAll(), this should all be encapsulated inside the queue. The thread would just get items and put items in the queue, and be blocked by the queue until possible. In short, you should reimplement BlockingQueue.

Upvotes: 1

Related Questions