Reputation: 604
Does anyone know if there is a reliable way to set up a producer consumer type queue where the following holds:
1) Producer initially puts three objects on queue
2) Consumer consumes one item leaving two objects on queue
3) Control passes back to Producer and Producer puts one more item on - ( now 3 objects on queue again)
4) Consumer consumes one more item and the cycle continues hence
Please note I need to set up the solution manually without using any interfaces for an assignment I am working on. Any advice would be appreciated.
Is such fine tuning available with threads?
Upvotes: 0
Views: 159
Reputation: 11
I hope this solution gives you some help:
Explanation goes as :
Two separate threads (Producer thread and Consumer thread)
work in co-ordination with each other on a common queue(Here i have mentioned array). Producer puts three elements from a data array and consumer fetch one and remove that from same array.When data Array all elements are put in queue, consumer only fetches one-by-one. put()
and take()
are synchronized methods defined in a separate class Drop
.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerExample {
public static final Lock fileLock = new ReentrantLock();
public static final Condition condition = fileLock.newCondition();
public static String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too",
"abc",
"def",
"ghi",
"jkl",
"mno",
"pqr"
};
public static List<String> list = new ArrayList<String>();
public static boolean done = false;
public static void main(String[] args) {
Drop drop = new Drop();
Thread tProducer = new Thread(new Producer(drop));
Thread tConsumer = new Thread(new Consumer(drop));
try{
tProducer.start();
tConsumer.start();
}
catch(Exception ie){}
}
}
public class Consumer implements Runnable {
private Drop drop;
public Consumer(Drop drop) {
this.drop = drop;
}
public void run() {
try{
ProducerConsumerExample.fileLock.lock();
for (String message = drop.take();
! message.equals("DONE");
message = drop.take()) {
System.out.format("MESSAGE RECEIVED: %s%n", message);
ProducerConsumerExample.list.remove(0);
if(ProducerConsumerExample.done)
continue;
else{
ProducerConsumerExample.condition.signal();
System.out.println("Consumer is waiting");
ProducerConsumerExample.condition.await();
}
} catch (InterruptedException e) {}
}
}
catch(Exception e){
}
finally{
ProducerConsumerExample.fileLock.unlock();
}
}
}
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Producer implements Runnable {
private Drop drop;
public Producer(Drop drop) {
this.drop = drop;
}
public void run() {
try{
ProducerConsumerExample.fileLock.lock();
Random random = new Random();int check = 3;
for (int i = 0;
i <ProducerConsumerExample.importantInfo.length;
i++) {
if(i<check){
System.out.println("Putting message");
System.out.println(ProducerConsumerExample.importantInfo[i]);
drop.put(ProducerConsumerExample.importantInfo[i]);
}
else{
check = check+3;
i--;
ProducerConsumerExample.condition.signal();
System.out.println("Producer is waiting");
ProducerConsumerExample.condition.await();
}
}
drop.put("DONE");
ProducerConsumerExample.done =true;
ProducerConsumerExample.condition.signal();
System.out.println("Producer is waiting");
ProducerConsumerExample.condition.await();
}
catch(Exception e){
e.printStackTrace();
}
finally{
ProducerConsumerExample.fileLock.unlock();
}
}
}
import java.util.ArrayList;
import java.util.List;
public class Drop {
// Message sent from producer
// to consumer.
private String message;
public synchronized String take() {
System.out.println(ProducerConsumerExample.list.size());
return ProducerConsumerExample.list.get(0);
}
public synchronized void put(String message) {
// Store message.
ProducerConsumerExample.list.add(message);
}
}
Upvotes: 1
Reputation: 47729
What I would probably do is have a relatively mundane queue and also have a semaphore. Initialize the semaphore to -2. Have the producer increment the semaphore whenever it enqueues a request. Have the consumer decrement the semaphore prior to dequeuing a request. The consumer won't be able to do a decrement until the count gets to 1, and so there will always be 2 unserviced requests in the queue.
Upvotes: 3
Reputation: 4221
You can use an ArrayList and set the capacity to 3; then, anytime the Producer executes, the array size is checked; if the size is lower than 3, simply insert values to the ArrayList till the size is 3; if the size is 3; just call the notifyAll() method to finish its turn.
Similarly for the Consumer, if the size is 3, consume one of the values, and remove it from the ArrayList; if the size is less than 3, simply call the notifyAll() method to finish its turn.
This is basically how it works in a nutshell; the implementation will depend on what you plan to achieve with it.
Hope this helps.
Upvotes: 0
Reputation: 54790
Java 7 has a LinkedTransferQueue which sounds like what you are looking for, or check out the parent type BlockingQueue, I'm sure one of them will fit the bill.
Upvotes: 0