Reputation: 1030
Below is my implementation for the PC problem
public class CircularQueue {
Queue <Integer>queue = new LinkedList<Integer>();
final int LIMIT = 10;
static Semaphore semProd = new Semaphore(1);
static Semaphore semConsu = new Semaphore(0);
public void enqueue(int productId) throws InterruptedException{
semProd.acquire();
queue.add(productId);
System.out.println(Thread.currentThread().getName()+" Putting(In Q) Product ID:"+productId);
semConsu.release();
}
public int deueue() throws InterruptedException{
semConsu.acquire();
int productID = (int) queue.remove();
System.out.println(Thread.currentThread().getName()+" Getting (In Q) Product ID:"+productID);
semProd.release();
return productID;
}
}
//producer class
public class Producer implements Runnable{
CircularQueue cQueue ;
public Producer(CircularQueue queue){
this.cQueue = queue;
}
public void run(){
while(true){
for(int i =0 ; i < 5 ;i++){
try {
cQueue.enqueue(i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}}}
//consumer class
public class Consumer implements Runnable{
CircularQueue cQueue ;
public Consumer(CircularQueue cQueue){
this.cQueue = cQueue;
}
public void run(){
try {
while(true){
int item = cQueue.deueue();
Thread.sleep(2000);}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}
//Driver Class
public class DriverClass {
public static void main(String args[]){
CircularQueue cQueue = new CircularQueue();
new Thread(new Producer(cQueue)).start();
new Thread(new Consumer(cQueue)).start();
}}
1) How do I check if my implementation is correct 2)if I want to edit solution for multiple consumer and multiple producer then how should I change implementation
is increasing number of semProduce and sem consume enough?
static Semaphore semProd = new Semaphore(4);//4 producer
static Semaphore semConsu = new Semaphore(3);//3 consumer
Upvotes: 2
Views: 3633
Reputation: 133
Though it is late, here is a working example which supports multiple producers and consumers:
Create a bounded Buffer with 3 semaphores
First we'll create a Buffer Class:
class SemaphoreBuffer {
private Queue<Integer> queue = new LinkedList<>();
/**
* This semaphore is used to keep track of the number of elements in the queue
*/
private Semaphore count = new Semaphore(0);
/**
* This acts as a mutex lock
*/
private Semaphore mutex = new Semaphore(1);
/**
* This is the limit of the queue
*
*/
private Semaphore limit;
/**
* @param limit
*/
public SemaphoreBuffer(int limit) {
super();
this.limit = new Semaphore(limit);
}
public void add(Integer item) throws InterruptedException {
// This order is very important
// Limit should be acquired first and only then mutex.
limit.acquire();
mutex.acquire();
queue.add(item);
mutex.release(); // releasing the lock
count.release(); // increasing the count
}
public Integer remove() throws InterruptedException {
Integer item = null;
count.acquire(); // acquire this first otherwise consumer thread may run before producer
mutex.acquire();
item = queue.remove();
mutex.release();
limit.release();
return item;
}
public int getCount() {
return count.availablePermits();
}
public int getLimit() {
return limit.availablePermits();
}
public int getMutex() {
return mutex.availablePermits();
}
}
Next, we'll create 2 classes
Producer
Consumer
class SemaphoreProducer implements Runnable {
private SemaphoreBuffer buffer;
private String name;
/**
* @param buffer
* @param name
*/
public SemaphoreProducer(SemaphoreBuffer buffer, String name) {
super();
this.buffer = buffer;
this.name = name;
}
@Override
public void run() {
Random random = new SecureRandom();
while (true) {
try {
int item = random.nextInt(100);
buffer.add(item);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer Class:
class SemaphoreConsumer implements Runnable {
private SemaphoreBuffer buffer;
private String name;
/**
* @param buffer
* @param name
*/
public SemaphoreConsumer(SemaphoreBuffer buffer, String name) {
super();
this.buffer = buffer;
this.name = name;
}
@Override
public void run() {
while (true) {
try {
int item = buffer.remove();
System.out.println(Thread.currentThread().getName() + " Consumer: " + name + " Removed: " + item
+ " Queue Size: " + buffer.getCount());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
In the above code, you can change the sleep timers and check results. That's it, our producer and consumer classes and architecture is ready! Next we'll create a Runner class to execute the code and check. Runner Class:
.
public class ProducerConsumerWithSemaphore {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
SemaphoreBuffer buffer = new SemaphoreBuffer(15);
Thread producer = new Thread(new SemaphoreProducer(buffer, "PRD1"));
Thread producer2 = new Thread(new SemaphoreProducer(buffer, "PRD2"));
Thread consumerA = new Thread(new SemaphoreConsumer(buffer, "A"));
Thread consumerB = new Thread(new SemaphoreConsumer(buffer, "B"));
Thread consumerC = new Thread(new SemaphoreConsumer(buffer, "C"));
producer.start();
producer2.start();
consumerA.start();
consumerB.start();
consumerC.start();
producer.join();
producer2.join();
consumerA.join();
consumerB.join();
consumerC.join();
}
}
Upvotes: 1
Reputation: 24857
For a general-purpose, bounded, multi-producer/consumer blocking queue with semaphores, you need three of them. One to count the number of free spaces in the queue, (initialized to the LIMIT of the queue), one to count the number of items in the queue, (initialized to zero), and another to protect the queue from multiple access, (initialized to 1, to act as a mutex).
Pseudocode:
Producer: wait(free); wait(mutex); queue.push(newItem); send(mutex); send(items);
Consumer: wait(items); wait(mutex); result=(queue.pop); send(mutex); send(free); return result;
Upvotes: 3