TechnoJab
TechnoJab

Reputation: 35

Where's the problem with this BoundedBuffer class?

Properties of the bounded buffer class I'm trying to build...

Now for the problem...

Problem: I can't seem to comment out the synchronized(this) blocks in the code below. The whole point of using AtomicInteger as pointers I thought was to avoid doing this.

Commenting out the synchronized(this) blocks yields in consumers missing some items that producers have put in. If I include the synchronized(this) block, everything's great and every single thing produced is consumed.

What am I missing?

public class BoundedBuffer<T> {
    private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
    private AtomicReferenceArray<T> m_buffer = null;
    private Semaphore m_full = new Semaphore(BUFFER_SIZE);
    private Semaphore m_empty = new Semaphore(0);
    private AtomicInteger m_writePointer = new AtomicInteger();
    private AtomicInteger m_readPointer = new AtomicInteger();

    public BoundedBuffer() {
        m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
    }

    public static int safeGetAndIncrement(AtomicInteger i) {
        int oldValue = 0, newValue = 0;
        do {
            oldValue = i.get();
            newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
        } while (!i.compareAndSet(oldValue, newValue));
        return oldValue;
    }

    public void add(T data) throws InterruptedException {
        m_full.acquire();
        synchronized (this) { // << Commenting this doesn't work
            // CAS-based overflow handling
            m_buffer.set(safeGetAndIncrement(m_writePointer),data);
        }
        m_empty.release();
    }

    public T get() throws InterruptedException {
        T data = null;
        m_empty.acquire();
        synchronized (this) { // << Commenting this doesn't work
            // CAS-based overflow handling
            data = m_buffer.get(safeGetAndIncrement(m_readPointer));
        }
        m_full.release();
        return data;
    }
}

Upvotes: 1

Views: 688

Answers (1)

Mike Tunnicliffe
Mike Tunnicliffe

Reputation: 10772

There could be a problem where the get() from the array is not atomic with the increment when the synchronized block is removed. The breaking scenario I speculate on requires the producer to be overrunning the consumers, then you could have the producer overwrite an array entry that has not been read yet IF the semaphore release was triggered by an out-of-order read.

Consider the situation where the buffer is full (writer index is at N, reader index is at N+1) and 2 threads are trying to read from the buffer. (Assume that N is not close to the wraparound point for simplicity.)

Thread 1 receives the index N+1 from which to read its item.

Thread 2 receives the index N+2 from which to read its item.

Due to a fluke of scheduling Thread 2 gets from the buffer array first and releases the m_full semaphore before Thread 1 gets its item from the array.

Thread 3 (a producer) wakes up and writes an item into the next available slot N+1 in the buffer, also before Thread 1 has read from the buffer.

Thread 1 then gets the item at index N+1, but has missed the item it wanted.

Upvotes: 1

Related Questions