user3053227
user3053227

Reputation: 63

Why does this implementation of BlockingQueue using LinkedList fail?

I tried to use LinkedList with ReentrantLock and Condition to implement a basic methods(put and get) of a BlockingQueue. When I tested it with one producer thread and one consumer thread it always fails at some point.

I follow the example ConditionBoundedBuffer in Book "Java Concurrency in Practice" page 309, instead using array I use LinkedList.

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyBlockingQueue<T> {
    private int capacity;
    private LinkedList<T> queue;
    private ReentrantLock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public MyBlockingQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }


    public void put(T element) throws InterruptedException {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + ": get lock!");
            int size = queue.size();
            while (size == capacity) {
                System.out.println(Thread.currentThread().getName() + ": Full!");
                notFull.await();
            }
            queue.addLast(element);
            System.out.println(Thread.currentThread().getName() + ": Done putting " + element + " to queue.");
            notEmpty.signal();
        } finally {
            lock.unlock();
            System.out.println(Thread.currentThread().getName() + ": release lock!");
        }
    }

    public T get() throws InterruptedException {
        T result = null;
        lock.lock();
        try {
            System.err.println(Thread.currentThread().getName() + ": get lock!");
            int size = queue.size();
            while (size == 0) {
                System.err.println(Thread.currentThread().getName() + ": Empty!");
                notEmpty.await();
            }
            result = queue.removeFirst();
            System.err.println(Thread.currentThread().getName() + ": Done taking " + result + " from queue.");
            notFull.signal();
        } finally {
            lock.unlock();
            System.err.println(Thread.currentThread().getName() + ": release lock!");
        }
        return result;
    }


    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue myQ = new MyBlockingQueue(5);
        Producer producer1 = new Producer(myQ, true);
        Consumer consumer = new Consumer(myQ);
        Thread producer1Thread = new Thread(producer1, "producer");

        Thread consumer1Thread = new Thread(consumer, "consumer");
        producer1Thread.start();
        consumer1Thread.start();
        //
        producer1Thread.join();
        System.out.println("producerThread done.");
        consumer1Thread.join();
        System.out.println("All done.");
    }
}

class Producer implements Runnable {
    MyBlockingQueue<String> queue;
    boolean forEvenNumber;

    Producer(MyBlockingQueue<String> queue, boolean evenNumber) {
        this.queue = queue;
        forEvenNumber = evenNumber;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                if ((forEvenNumber && (i % 2 == 0)) || (!forEvenNumber && i % 2 != 0)) {

                    queue.put("" + i);

                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}

class Consumer implements Runnable {
    MyBlockingQueue<String> queue;

    Consumer(MyBlockingQueue queue) {
        this.queue = queue;
    }


    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            try {
                queue.get();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

I expect to see the consumer print out all numbers from the queue that put by the producer. But it always failed. One example output is like this:

producer: get lock!
producer: Done putting 0 to queue.
producer: release lock!
consumer: get lock!
consumer: Done taking 0 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 2 to queue.
consumer: get lock!producer: release lock!

consumer: Done taking 2 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 4 to queue.
producer: release lock!
consumer: get lock!
consumer: Done taking 4 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 6 to queue.
producer: release lock!
producer: get lock!
producer: Done putting 8 to queue.
producer: release lock!
consumer: get lock!
consumer: Done taking 6 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 10 to queue.
consumer: get lock!
consumer: Done taking 8 from queue.
producer: release lock!
consumer: release lock!
producer: get lock!
producer: Done putting 12 to queue.
producer: release lock!
consumer: get lock!
consumer: Done taking 10 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 14 to queue.
consumer: get lock!
producer: release lock!
consumer: Done taking 12 from queue.
producer: get lock!
consumer: release lock!
producer: Done putting 16 to queue.
consumer: get lock!producer: release lock!

consumer: Done taking 14 from queue.
consumer: release lock!
producer: get lock!
producer: Done putting 18 to queue.
consumer: get lock!
producer: release lock!
consumer: Done taking 16 from queue.
consumer: release lock!
consumer: get lock!
consumer: Done taking 18 from queue.
consumer: release lock!
consumer: get lock!
consumer: Empty!
producer: get lock!
producer: Done putting 20 to queue.
producer: release lock!
consumer: Empty!
producer: get lock!
producer: Done putting 22 to queue.
consumer: Empty!producer: release lock!

producer: get lock!
producer: Done putting 24 to queue.
producer: release lock!
consumer: Empty!
producer: get lock!
producer: Done putting 26 to queue.
consumer: Empty!
producer: release lock!
consumer: Empty!
producer: get lock!
producer: Done putting 28 to queue.
producer: release lock!
producer: get lock!
producer: Full!

Upvotes: 0

Views: 66

Answers (1)

Avi
Avi

Reputation: 2641

First thing I see:

int size = queue.size();
while (size == 0) {
    System.err.println(Thread.currentThread().getName() + ": Empty!");
    notEmpty.await();
}

If you never reassign size within this loop, you're going to loop forever, no matter whether notEmpty gets signalled or not. Instead, you might want something like the following:

while (queue.size() == 0) {
    System.err.println(Thread.currentThread().getName() + ": Empty!");
    notEmpty.await();
}

Additionally, you have:

int size = queue.size();
while (size == capacity) {
    System.out.println(Thread.currentThread().getName() + ": Full!");
    notFull.await();
}

where it should be:

while (queue.size() == capacity) {
    System.out.println(Thread.currentThread().getName() + ": Full!");
    notFull.await();
}

for the same reasons (size not being updated).

Upvotes: 1

Related Questions