Reputation: 2117
I guess I have not used synchronization properly. I get the below output .
I have consciously chosen not to use BlockingQueue OR java 5 concurrency features. I had written this so that I could learn synchronization and some basics.
Producer Thread: PRODUCER-1 adding item 0-Name-0 to queue
Consumer Thread CONSUMER-2 processing item: 0-Name-0
Producer Thread: PRODUCER-2 adding item 1-Name-1 to queue
Can you please help me understand where I am going wrong?
public class ProducerConsumerManager {
public static void main(String args[]){
ItemQueue itemQueue = new ItemQueue();
Producer producer1 = new Producer(itemQueue,15, 500);
Producer producer2 = new Producer(itemQueue,15, 1000);
Consumer consumer1 = new Consumer(itemQueue,500);
Consumer consumer2 = new Consumer(itemQueue,1500);
Thread producerThread1 = new Thread(producer1,"PRODUCER-1");
Thread producerThread2 = new Thread(producer2,"PRODUCER-2");
Thread consumerThread1 = new Thread(consumer1,"CONSUMER-1");
Thread consumerThread2 = new Thread(consumer2,"CONSUMER-2");
producerThread1.start();
producerThread2.start();
consumerThread1.start();
consumerThread2.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("The MAIN THREAD has been INTERRUPTED");
}
}
}
public class Consumer implements Runnable{
private ItemQueue itemQueue;
private int waitTimeInMillis;
public Consumer(ItemQueue queue, int waitTimeInMillis){
itemQueue = queue;
this.waitTimeInMillis = waitTimeInMillis;
}
private boolean processItem(Item item){
if(item == null){
System.out.println("Consumer Thread cannot process as Item is null");
return false;
}
return true;
}
public void run() {
synchronized(itemQueue){
try {
if(itemQueue.hasMoreItems()){
Item item = itemQueue.getNextItem();
System.out.println("Consumer Thread "+ Thread.currentThread().getName() + " processing item: " + item.getItemNo() + "-" + item.getItemName());
processItem(item);
Thread.sleep(waitTimeInMillis);
}else{
itemQueue.wait();
}} catch (InterruptedException e) {
System.out.println("Consumer Thread INTERRUPTED");
}
}
}
}
public class Producer implements Runnable{
private ItemQueue itemQueue;
private int maxCount;
private int waitTimeInMillis;
public Producer(ItemQueue queue, int maxItems, int waitTimeInMillis){
itemQueue = queue;
this.maxCount = maxItems;
this.waitTimeInMillis = waitTimeInMillis;
}
public void run() {
synchronized(itemQueue){
try {
if(itemQueue.queueCount()>=maxCount){
itemQueue.wait();
}
else{
produceNewItem();
Thread.sleep(waitTimeInMillis);
}
} catch (InterruptedException e) {
System.out.println("Producer Thread INTERRUPTED");
}
}
}
private boolean produceNewItem(){
Item item = null;
synchronized(ItemService.class){
item = ItemService.getNextItem();
System.out.println("Producer Thread: " + Thread.currentThread().getName() + " adding item " + item.getItemNo() +"-"+item.getItemName()+" to queue");
itemQueue.addItem(item);
return true;
}
}
}
import java.util.LinkedList;
public class ItemQueue {
private LinkedList<Item> itemList = new LinkedList<Item>();
public void addItem(Item item){
itemList.add(item);
}
public Item getNextItem(){
return itemList.poll();
}
public boolean hasMoreItems(){
return !itemList.isEmpty();
}
public int queueCount(){
return itemList.size();
}
}
public class Item {
private String itemName;
private int itemNo;
private String itemDescription;
public String getItemName() {
return itemName;
}
public void setItemName(String itemName) {
this.itemName = itemName;
}
public int getItemNo() {
return itemNo;
}
public void setItemNo(int itemNo) {
this.itemNo = itemNo;
}
public String getItemDescription() {
return itemDescription;
}
public void setItemDescription(String itemDescription) {
this.itemDescription = itemDescription;
}
public Item (int no, String name, String desc){
itemName = name;
itemNo = no;
itemDescription = desc;
}
}
import java.util.LinkedList;
public class ItemService {
static LinkedList<Item> itemList = new LinkedList<Item>();
static int counter =0;
static{
Item item = null;
for(int i=0;i<10000;i++){
item = new Item(i, "Name-"+i, "Description for item " + i);
itemList.add(item);
}
}
public static Item getNextItem(){
if(counter < 9999){
Item item= itemList.get(counter);
counter++;
return item;
}
else
{
System.out.println("Cannot PRODUCE any further items. all exhausted");
return null;
}
}
}
Upvotes: 0
Views: 651
Reputation: 509
One of the points you are wrong at (maybe not the reason of your problem) is that in Producer/Consumer model you should start/run your consumers before producers.
Upvotes: 0
Reputation: 691775
You've still not said which issue you have, i.e. what you expect instead of the output you get, but there are two important problems with your code:
notify()
, or preferrably, notifyAll()
, called to wake up the waiting threads. This will inevitably lead to starvation. When a producer puts an item in the queue, it should call notifyAll()
to wake up the waiting consumers. When a consumer removes an item from the queue, it should call notifyAll()
to wake up the waiting producers.Object.wait()
. Another, less important problem, is that rather to force each thread to implement synchronization and wait()
/notifyAll()
, this should all be encapsulated inside the queue. The thread would just get items and put items in the queue, and be blocked by the queue until possible. In short, you should reimplement BlockingQueue.
Upvotes: 1