Reputation: 63
I tried to use LinkedList with ReentrantLock and Condition to implement a basic methods(put and get) of a BlockingQueue. When I tested it with one producer thread and one consumer thread it always fails at some point.
I follow the example ConditionBoundedBuffer in Book "Java Concurrency in Practice" page 309, instead using array I use LinkedList.
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyBlockingQueue<T> {
private int capacity;
private LinkedList<T> queue;
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public void put(T element) throws InterruptedException {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + ": get lock!");
int size = queue.size();
while (size == capacity) {
System.out.println(Thread.currentThread().getName() + ": Full!");
notFull.await();
}
queue.addLast(element);
System.out.println(Thread.currentThread().getName() + ": Done putting " + element + " to queue.");
notEmpty.signal();
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + ": release lock!");
}
}
public T get() throws InterruptedException {
T result = null;
lock.lock();
try {
System.err.println(Thread.currentThread().getName() + ": get lock!");
int size = queue.size();
while (size == 0) {
System.err.println(Thread.currentThread().getName() + ": Empty!");
notEmpty.await();
}
result = queue.removeFirst();
System.err.println(Thread.currentThread().getName() + ": Done taking " + result + " from queue.");
notFull.signal();
} finally {
lock.unlock();
System.err.println(Thread.currentThread().getName() + ": release lock!");
}
return result;
}
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue myQ = new MyBlockingQueue(5);
Producer producer1 = new Producer(myQ, true);
Consumer consumer = new Consumer(myQ);
Thread producer1Thread = new Thread(producer1, "producer");
Thread consumer1Thread = new Thread(consumer, "consumer");
producer1Thread.start();
consumer1Thread.start();
//
producer1Thread.join();
System.out.println("producerThread done.");
consumer1Thread.join();
System.out.println("All done.");
}
}
class Producer implements Runnable {
MyBlockingQueue<String> queue;
boolean forEvenNumber;
Producer(MyBlockingQueue<String> queue, boolean evenNumber) {
this.queue = queue;
forEvenNumber = evenNumber;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
if ((forEvenNumber && (i % 2 == 0)) || (!forEvenNumber && i % 2 != 0)) {
queue.put("" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
MyBlockingQueue<String> queue;
Consumer(MyBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 50; i++) {
try {
queue.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
I expect to see the consumer print out all numbers from the queue that put by the producer. But it always failed. One example output is like this:
producer: get lock!
producer: Done putting 0 to queue.
producer: release lock!
consumer: get lock!
consumer: Done taking 0 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 2 to queue.
consumer: get lock!producer: release lock!
consumer: Done taking 2 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 4 to queue.
producer: release lock!
consumer: get lock!
consumer: Done taking 4 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 6 to queue.
producer: release lock!
producer: get lock!
producer: Done putting 8 to queue.
producer: release lock!
consumer: get lock!
consumer: Done taking 6 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 10 to queue.
consumer: get lock!
consumer: Done taking 8 from queue.
producer: release lock!
consumer: release lock!
producer: get lock!
producer: Done putting 12 to queue.
producer: release lock!
consumer: get lock!
consumer: Done taking 10 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 14 to queue.
consumer: get lock!
producer: release lock!
consumer: Done taking 12 from queue.
producer: get lock!
consumer: release lock!
producer: Done putting 16 to queue.
consumer: get lock!producer: release lock!
consumer: Done taking 14 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 18 to queue.
consumer: get lock!
producer: release lock!
consumer: Done taking 16 from queue.
consumer: release lock!
consumer: get lock!
consumer: Done taking 18 from queue.
consumer: release lock!
consumer: get lock!
consumer: Empty!
producer: get lock!
producer: Done putting 20 to queue.
producer: release lock!
consumer: Empty!
producer: get lock!
producer: Done putting 22 to queue.
consumer: Empty!producer: release lock!
producer: get lock!
producer: Done putting 24 to queue.
producer: release lock!
consumer: Empty!
producer: get lock!
producer: Done putting 26 to queue.
consumer: Empty!
producer: release lock!
consumer: Empty!
producer: get lock!
producer: Done putting 28 to queue.
producer: release lock!
producer: get lock!
producer: Full!
Upvotes: 0
Views: 66
Reputation: 2641
First thing I see:
int size = queue.size();
while (size == 0) {
System.err.println(Thread.currentThread().getName() + ": Empty!");
notEmpty.await();
}
If you never reassign size within this loop, you're going to loop forever, no matter whether notEmpty
gets signalled or not. Instead, you might want something like the following:
while (queue.size() == 0) {
System.err.println(Thread.currentThread().getName() + ": Empty!");
notEmpty.await();
}
Additionally, you have:
int size = queue.size();
while (size == capacity) {
System.out.println(Thread.currentThread().getName() + ": Full!");
notFull.await();
}
where it should be:
while (queue.size() == capacity) {
System.out.println(Thread.currentThread().getName() + ": Full!");
notFull.await();
}
for the same reasons (size not being updated).
Upvotes: 1