just ME
just ME

Reputation: 1827

Java one producer and two consumers

I have a producer and two consumers. I want to display how the consumers take the values from producer and displayed them. The problem is that in my code only the second consumer displayed the item from producer. How to solve this? here is the problem:

  public static void main(String[] args) throws Exception {
    // Object of a class that has both produce()
    // and consume() methods
    final ProdConsumer pc = new ProdConsumer();

    // Create producer thread
    Thread t1 = new Thread(new Runnable() {
        public void run() {
            try {
                pc.produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    // Create consumer thread
    Thread t2 = new Thread(new Runnable() {
        public void run() {
            try {
                pc.consume(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    Thread t3 = new Thread(new Runnable() {
        public void run() {
            try {
                pc.consume(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    // Start both threads
    t1.start();
    t2.start();
    t3.start();

//        // t1 finishes before t2
        t1.join();
        t2.join();
        t3.join();
    }

And producer_consumer class:

 public  class ProdCons
    {
        // Create a list shared by producer and consumer
        // Size of list is 2.
        LinkedList<Integer> list = new LinkedList<Integer>();
        int capacity = 2;

// Function called by producer thread
public void produce() throws InterruptedException
{
    int value = 0;
    while (true)
    {
        synchronized (this)
        {
            // producer thread waits while list
            // is full
            while (list.size()==capacity)
                wait();

            System.out.println("Producer produced-"
                    + value);

            // to insert the jobs in the list
            list.add(value++);

            // notifies the consumer thread that
            // now it can start consuming
            notify();

            // makes the working of program easier
            // to  understand
            Thread.sleep(1000);
        }
    }
}

// Function called by consumer thread
public void consume(int thread) throws InterruptedException
{
    while (true)
    {
        synchronized (this)
        {
            // consumer thread waits while list
            // is empty
            while (list.size()==0)
                wait();

            //to retrive the ifrst job in the list
            int val = list.removeFirst();

            System.out.println("Consumer" + thread + " consumed-"
                    + val);

            // Wake up producer thread
            notify();

            // and sleep
            Thread.sleep(1000);
        }
    }
}

}

Thank you What am I missing?

Upvotes: 0

Views: 2687

Answers (6)

user2023577
user2023577

Reputation: 2101

First of all, you need to use .notifyAll(), not .notify() (which can be bad if one consumer notifies the other consumer; the producer would never wake).

Second, the data isn't sent to 2 lists but only one and the consumers are fighting to get from the same place; java has always said that there is no predictable thread scheduling under such case like sleep/wait/synchonized etc... Having only one consumer and the same repeatedly waking is within spec.

You need to use ReentrantLock(true) for a fair locking/waking.

Upvotes: 0

JineshEP
JineshEP

Reputation: 748

The t1.join t2.join t3.join will only let the main thread wait for the t1,t2,t3 producer and consumer threads to finish. In this case all threads run in while loop so join call doesn't make any difference. Also, a thread does not get to wait, if the synchronized block in that thread is not executed. Depending on who acquire the lock first, the synchronized blocks , will get executed.

Upvotes: 0

Ruslan Akhundov
Ruslan Akhundov

Reputation: 2216

wait/notify mechanism isn't fair, that means that if there are two threads waiting for the same resource, any of them could be notified when you call notify(). That sometimes is an issue of starvation problem.

So in your case when you are notifying first time, for example, first consumer gets this notification, and then after finishing his job it calls wait again, that means that on second time producer calls notify you have again two consumers waiting and then there is no guarantee that it would wake up another consumer, it could be any of them.

This problem will go away in case you will decrease Thread.sleep amount in producer, to be less than in consumer. Actually may be its not even a problem, because in your case throughput of consumer is the same as in producer, so basically you don't need second consumer, however its a rare case in real life, so to emulate the case when you have both consumers working, you should increase throughput of the producer.

However in my opinion you should really think before using such low level mechanism as wait/notify. Take a look at BlockingQueue, for example, or other concurrency primitives in java. For example you can make ArrayBlockingQueue to be fair:

Java doc: This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

So instead of list you will have this queue, and when calling take on this queue you will either get next element in a queue to consume or, in case its empty, your thread will block until there will be new elements. And specifying fair flag to be true means that it will use FIFO for next consumer to wake up.

So your code will look like:

public  class ProdCons {
    // Create a queue shared by producer and consumer
    // Size of list is 2.
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(2, true);
    int capacity = 2;

    // Function called by producer thread
    public void produce() throws InterruptedException {
        int value = 0;
        while (true) {
            System.out.println("Producer produced-" + value);

            // to insert the jobs in the queue
            // will block in case there is no more space in a queue
            queue.put(value++);
            // and sleep
            Thread.sleep(500);
        }
    }

    // Function called by consumer thread
    public void consume(int thread) throws InterruptedException {
        while (true) {
            //retrieve the first job in the queue
            //will block in case queue is empty, until its not empty
            int val = queue.take();

            System.out.println("Consumer" + thread + " consumed-"
                    + val);

            // and sleep
            Thread.sleep(1000);
        }
    }
}

Also you may be interesting in this article explaining starvation and wait/notify fairness: http://tutorials.jenkov.com/java-concurrency/starvation-and-fairness.html

Upvotes: 2

Md Ayub Ali Sarker
Md Ayub Ali Sarker

Reputation: 11557

I would like to solve this problem in different way using java message queue(JMS) by publish and subscribe. The publish/subscribe messaging domain is a one-to-many model where one publisher sends the message through a topic to all the subscribers who are active and they receive the message through topic. it is simple and easy to implement. here is the details. https://howtodoinjava.com/jms/jms-java-message-service-tutorial/

Upvotes: 0

daniu
daniu

Reputation: 14999

To illustrate my comment on the not using wait/notify, here's a producer/consumer with a BlockingQueue. Sorry if that doesn't actually answer your question about why the code doesn't work.

static final AtomicInteger value = new AtomicInteger();
public static void main(String[] args) {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);

    Thread producer = new Thread(() -> { queue.put(value.getAndIncrement()) });
    producer.start();

    Runnable consumer1 = () -> {
        try { 
            while(true) {
               System.out.println("Consumer 1 consuming " + queue.take());
                Thread.sleep(200); 
            }
        }{ catch(Exception e) {}
    };
    Runnable consumer2 = () -> {
        try { 
            while(true) {
               System.out.println("Consumer 2 consuming " + queue.take());
                Thread.sleep(200); 
            }
        }{ catch(Exception e) {}
    };
    new Thread(consumer1).start();
    new Thread(consumer2).start();
}

Side note, I usually wouldn't even create Thread objects directly but use an ExecutorService instead, but that's beside the point.

Upvotes: 0

Doug
Doug

Reputation: 709

Each of your threads is synchronized on itself (this), which will be different for each thread, so it won't prevent them from operating at the same time. Since they're (supposed to be) manipulating a list shared across the threads, they should probably synchronize on that list (or some other shared lock object). And, more problematically, it looks like each thread creates its own List - they won't share lists. The List should either be a static (class) list, or it should be passed in to each thread.

Upvotes: -1

Related Questions