ksiomelo
ksiomelo

Reputation: 1908

Creating a Synchronized Buffer for Producer/Consumer pattern in Java

I implemented a buffer for the producer/consumer pattern, however, it seems that the Consumer never acquires the lock so Starvation occurs. I can't identify why this happens since both put() and take() seem to release the lock properly...

I know there is BlockingQueue and other nice implementations, but I want to implement this using wait() and notify() as an exercise.

public class ProducerConsumerRaw {

    public static void main(String[] args) {
        IntBuffer buffer = new IntBuffer(8);

        ConsumerRaw consumer = new ConsumerRaw(buffer);
        ProducerRaw producer = new ProducerRaw(buffer);

        Thread t1 = new Thread(consumer);
        Thread t2 = new Thread(producer);

        t1.start();
        t2.start();

    }
}

class ConsumerRaw implements Runnable{
    private final IntBuffer buffer;

    public ConsumerRaw(IntBuffer b){
        buffer = b;
    }

    public void run() {
        while(!buffer.isEmpty()) {
            int i = buffer.take();
            System.out.println("Consumer reads "+i); // this print may not be in the order
        }
    }
}


class ProducerRaw implements Runnable{
    private final IntBuffer buffer;

    ProducerRaw(IntBuffer b) {
        this.buffer = b;
    }

    public void run(){
        for (int i = 0; i < 20; i++) {
            int n = (int) (Math.random()*100);
            buffer.put(n);
            System.out.println("Producer puts "+n);
        }
    }
}


class IntBuffer{

    private final int[] storage;
    private volatile int end;
    private volatile int start;

    public IntBuffer(int size) {
        this.storage = new int[size];
        end = 0;
        start = 0;
    }


    public void put(int n) { // puts add the END
        synchronized(storage) {
            boolean full = (start == (end+storage.length+1)%storage.length);
            while(full){ // queue is full
                try {
                    storage.notifyAll(); 
                    storage.wait();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.storage[end] = n;
            end = incrementMod(end);
            storage.notifyAll();
        }
    }


    public int take(){

        synchronized(storage) {
            while (end == start) { // empty queue
                try {
                    storage.notifyAll(); // notify waiting producers
                    storage.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            int index = start;
            start = incrementMod(start);
            storage.notifyAll(); // notify waiting producers
            return this.storage[index];

        }
    }

    private int incrementMod(int index) {
        synchronized (storage) {
            if (index == storage.length-1) return 0;
            else return index+1;
        }
    }

    public boolean isEmpty(){
        synchronized (storage) {
            return (start == end);
        }
    }

}

Upvotes: 5

Views: 5593

Answers (1)

Jon Skeet
Jon Skeet

Reputation: 1501636

This is at least one problem, in your put method:

boolean full = (start == (end+storage.length+1)%storage.length);           
while(full){ // queue is full
    // Code that doesn't change full
}

If full is ever initialized as true, how do you expect the loop to end?

The other problem is this loop, in the consumer:

while(!buffer.isEmpty()) {
    int i = buffer.take();
    System.out.println("Consumer reads "+i);
}

You're assuming the producer never lets the buffer get empty - if the consumer starts before the producer, it will stop immediately.

Instead, you want some way of telling the buffer that you've stopped producing. The consumer should keep taking until the queue is empty and won't receive any more data.

Upvotes: 7

Related Questions