ReD
ReD

Reputation: 1

My producer-consumer implementation is exceeding maximum size

I have implemented a Runnable Producer and a Runnable Consumer alongwith a Store class. The Store has a shelf of maximum size allocated. Each producer adds an item and each consumer will remove an item. I am using a CachedThreadPool to create both types of the threads where each thread run on a infinite loop. I have also implemented two separate semaphore for Runnable class with the Producer getting the initial Permit size equal to the max Shelf size of the Store and the consumer's initial permit set to zero.

When I run the code I received a runtime error of IndexOutOfBoundsException, I understood the problem for this is the 'this.items.size()-1' line where two consumer read the same index and try to remove them leading to the above error. But I also received another unexpected result where the count of my List holding the items is exceeding the shelf size. For example in the below code, I set the initial maxShelfs size as 5 and gave Producer the permit size of 5. But I kept getting the output as "Consumer size: 6" following this I tried to put a breakpoint in the addItem method when list size exceeds the maxShelf size. Several runs confirmed that the list size did reach 6 and that this happens rarely. I am unable to understand why the list size is 6 when the actual permit is limited?

public class main {
    public static void main(String[] args) {
        Semaphore ps = new Semaphore(5);
        Semaphore cs = new Semaphore(0);
        Store s = new Store(5,ps,cs);
        ExecutorService es = Executors.newCachedThreadPool();
        for(int i =0;i<8;i++)
        {
            es.execute(new Producer(s,ps,cs));
        }
        for(int i =0;i<20;i++)
        {
            es.execute(new Consumer(s,ps,cs));
        }
    }
}


public class Store {
    private List<Object> items;
    public int maxShelfs;
    Semaphore ps;
    Semaphore cs;

    public Store (int maxShelfs,Semaphore ps, Semaphore cs)
    {
        this.ps = ps;
        this.cs = cs;
        this.maxShelfs = maxShelfs;
        items = new ArrayList<>();
    }

    public int getMaxShelfs()
    {
        return maxShelfs;
    }

    public List<Object> getItems()
    {
        return items;
    }

    public void addItem()
    {

        System.out.println("Producer size: "+this.items.size());
        this.items.add(new Object());
        if(items.size()==6) {
            System.out.println("test");
        }
    }

    public void removeItem()
    {

        System.out.println("Consumer size: "+this.items.size());
        this.items.remove(this.items.size()-1);
    }
}


public class Consumer implements Runnable{
    private Store s;
    Semaphore ps;
    Semaphore cs;

    public Consumer(Store s, Semaphore ps, Semaphore cs)
    {
        this.ps = ps;
        this.cs = cs;
        this.s = s;
    }

    public void run()
    {
        while(true)
        {
            try
            {
                cs.acquire();
//                Thread.sleep(20);
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
            if (s.getItems().size() > 0)
            {
                s.removeItem();
            }
            ps.release();
        }
    }
}


public class Producer implements Runnable{
    private Store s;
    Semaphore ps;
    Semaphore cs;

    public Producer (Store s,Semaphore ps,Semaphore cs)
    {
        this.ps = ps;
        this.cs = cs;
        this.s = s;
    }

    public void run()
    {
        while(true)
        {
            try
            {
                ps.acquire();
//                Thread.sleep(20);
                if (s.getItems().size() < s.getMaxShelfs()) {
                    s.addItem();
                }
                cs.release();

            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }



        }
    }
}

I am trying to learn concurrency and have not learnt more about try and catch so my inital point was to look at a rare case where overlapping of list size reads in the condition due to misreads during the run method might lead this but I was not able to come up with such test case (I was told that List is not thread-safe and I should use a concurrent data structure).

The case where list size is 6

Upvotes: 0

Views: 90

Answers (2)

Solomon Slow
Solomon Slow

Reputation: 27190

Your code lacks mutual exclusion. In a nutshell, you have a stateful object, the Store, you have multiple threads that want to see the state, and at least one of those threads wants to change the state.

Changing the state (e.g., addItem(), removeItem()) has more than a single step, and if one thread looks at the state at the moment when some other thread is half-way done changing it, the thread that's looking can see something inconsistent. Anything could happen as a result; The program could throw an exception, Items could disappear from the list, Duplicates could appear, the JVM could crash with a segfault. The behavior of your program is undefined.

Mutual exclusion looks like this:

java.util.concurrent.locks.ReentrantLock;

public class Store {
    private final ReentrantLock lock = new ReentrantLock();
    ...
    public void addItem() {
        lock.lock();
        try {
            ...access/modify shared state variables...
        }
        finally {
            lock.unlock();
        }
    }
    ...
}

The lock.lock() call causes the calling thread to "own" the lock, and only one thread will ever be allowed to own the same lock at the same time. If thread A calls lock.lock() while thread B "owns" it, the lock.lock() call in thread A simply will not return until thread B relinquishes ownership by calling lock.unlock().

Locking and unlocking also does something that goes by several names; "synchronization," "memory visibility," and "happens before relationship." On a system with more than one CPU (which is most workstations/laptops/servers/mobile devices these days) using locks will ensure that changes caused by code running on one CPU will be communicated to code running on other CPUs. In a nutshell: Every change that thread B made before it called lock.unlock() is guaranteed to be visible to thread A after thread A subsequently completes a lock.lock() call.

There are some more advanced approaches to solving synchronization problems and mutating shared state, but as a beginner, your focus should be on using mutex locks.

As a complete beginner, you should start with having just one lock. There are some very good reasons for having more than one lock, but until you get the hang of it...

  • Lock the lock before writing or reading any shared variable,
  • Don't unlock the lock until all of the shared variables are in a state that other threads should be allowed to see.

Extra credit:

  • Don't sleep() or do anything else that takes a long time while keeping the lock locked. If you think you need to wait for something while locked, maybe there's a way you can (a) back off, release the lock without changing anything, (b) wait, and then (c) try again.

Upvotes: 0

Andrey Smelik
Andrey Smelik

Reputation: 1266

Your store is not thread-safe. Since both the producer and consumer are modifying the store, they cannot operate in parallel. You need to serialize the add and remove operations using a mutex. If you want to use a semaphore, you only need one instance that only one thread passes through:

Semaphore semaphore = new Semaphore(1);

You need to use this semaphore when adding elements:

semaphore.acquire();
try {
    if (s.getItems().size() < s.getMaxShelfs()) {
        s.addItem();
    }
} finally {
    semaphore.release();
}

The same semaphore must be used when removing elements:

semaphore.acquire();
try {
    if (s.getItems().size() > 0) {
        s.removeItem();
    }
} finally {
    semaphore.release();
}

Upvotes: 0

Related Questions