Yoda
Yoda

Reputation: 18068

Consumers producer multithreading consumers are not dying

I have a standard producer consumer problem. Producer puts data into the stack(buffer) consumers take it. The problem is that consumers are not dying(not always) when the producer ends producing the data.

I would like to make consumer die after producer ends his for loop

for(int i = 0; i < 10; i++){
        try{
    //      sleep((int)(Math.random() * 1));                
        }catch(Exception e){e.printStackTrace();}
        b.put((int) (Math.random()* 10));
        System.out.println("i = " + i);
    }
    b.stop();

so then I call b.stop() which changes running field in Buffer to false and notifiesAll()

End then I get:

i = 9 // number of iteration this is 10th iteration
Consumer 2.: no data to take. I wait.  Memory: 0
Consumer 1.: no data to take. I wait.  Memory: 0
Consumer 3.: no data to take. I wait.  Memory: 0

they should die then, so I made method stop() but it did not work.

Code is running please check it

import java.util.Stack;


public class Buffer {
private static int SIZE = 4;
private int i;//number of elements in buffer
public Stack<Integer> stack;
private volatile boolean running;
    public Buffer() {
        stack = new Stack<>();
        running = true;
        i = 0;
    }
    synchronized public void put(int val){
        while (i >= SIZE) {
            try {
                System.out.println("Buffer full, producer waits");
                wait();
            } catch (InterruptedException exc) {
                exc.printStackTrace();
            }
        }   
        stack.push(val);//txt = s;
        i++;
        System.out.println("Producer inserted " + val + " memory: " + i);
        if(i - 1 == 0)
            notifyAll();
        System.out.println(stack);
    }

    public synchronized Integer get(Consumer c) {
        while (i == 0) {
            try {
                System.out.println(c + ": no data to take. I wait.  Memory: " + i);
                wait();
            } catch (InterruptedException exc) {
                exc.printStackTrace();
            }
        }   
        if(running){
            int data = stack.pop();
            i--;    
            System.out.println(c+  ": I took: " + data +" memory: " +  i);
            System.out.println(stack);
            if(i + 1 == SIZE){//if the buffer was full so the producer is waiting
                notifyAll();
                System.out.println(c +  "I notified producer about it");
        }
        return data;}
        else 
            return null;
    }

    public boolean isEmpty(){
        return i == 0;
    }
    public synchronized void stop(){//I THOUGH THIS WOULD FIX IT~!!!!!!!!!!!!!!
        running = false;
        notifyAll();
    }
    public boolean isRunning(){
        return running;
    }

}

public class Producer extends Thread {
private Buffer b;
    public Producer(Buffer b) {
        this.b = b;
    }

    public void run(){
        for(int i = 0; i < 10; i++){
            try{
        //      sleep((int)(Math.random() * 1));                
            }catch(Exception e){e.printStackTrace();}
            b.put((int) (Math.random()* 10));
            System.out.println("i = " + i);
        }
        b.stop();
    }

}

public class Consumer extends Thread {
    Buffer b;
    int nr;
    static int NR = 0;

    public Consumer(Buffer b) {
        this.b = b;
        nr = ++NR;
    }

    public void run() {
        Integer i = b.get(this);
        while (i != null) {
            System.out.println(nr + " I received : " + i);
            i = b.get(this);
        }
        System.out.println("Consumer " + nr + " is dead");
    }

    public String toString() {
        return "Consumer " + nr + ".";
}

}

public class Main {

    public static void main(String[] args) {

        Buffer b = new Buffer();
        Producer p = new Producer(b);
        Consumer c1 = new Consumer(b);
        Consumer c2 = new Consumer(b);
        Consumer c3 = new Consumer(b);  
        p.start();
        c1.start();c2.start();c3.start();

    }

}

Upvotes: 1

Views: 194

Answers (4)

AHMAD SUMRAIZ
AHMAD SUMRAIZ

Reputation: 531

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadsConsolApp
{

        public sealed class ProducerConsumer
        {
            const int MagicNumber = 30;                                 // Indicates how many times to bounce between ping and pong threads 
            private Object m_lock = new Object();                       // Lock to protect counter increment 
            private Queue<int> m_queue = new Queue<int>();

            // Ctor 
            public ProducerConsumer()
            {
            }
            public void admin()
            {
                int i = 0;
                lock (m_lock)
                {
                    while (i <= MagicNumber)
                    {
                        Monitor.Wait(m_lock); //wait
                        Console.WriteLine("Controller = Produced " + i + " , Consumeed " + i);
                        Monitor.Pulse(m_lock); //release
                        i++;
                    }
                }
            }
            // Ping
            public void Producer()
            {
                int counter = 0;

                lock (m_lock)                                               // Allows only one thread at a time inside m_lock 
                {
                    while (counter <= MagicNumber)
                    {
                        Thread.Sleep(500);                               // Get data chunks from some source 
                        Monitor.Wait(m_lock);                           // Wait if the thread is busy. 'wait' will hold 
                                                                       //this loop until something else pulses it to release the wait. 
                        Console.WriteLine("producer {0}", counter);
                        m_queue.Enqueue(counter);//entring in queue
                        Monitor.Pulse(m_lock);                          // Releases consumer thread 
                        counter++;
                    }
                }
            }

            public void Consumer()
            {
                lock (m_lock)                                           // Allows only one thread at a time inside m_lock 
                {
                    Monitor.Pulse(m_lock);

                    while (Monitor.Wait(m_lock,1000))                  // Wait in the loop while producer is busy. Exit when producer times-out. 1000 = 1 second; ...
                                                                      //app will hang without this time-out value 
                    {
                        int data = m_queue.Dequeue();//dispatch from queue
                        Console.WriteLine("consumer {0}", data);
                        Monitor.Pulse(m_lock);                          // Release consumer 
                        Console.WriteLine("=====================");
                    }
                }
            }
        }

        class Program
        {
            static void Main(string[] args)
            {
                ProducerConsumer app = new ProducerConsumer();

                // Create 2 threads 
                Thread t_producer = new Thread(new ThreadStart(app.Producer));
                Thread t_consumer = new Thread(new ThreadStart(app.Consumer));
                Thread t_admin = new Thread(new ThreadStart(app.admin));

                // Start threads 
                t_admin.Start();
                t_producer.Start();
                t_consumer.Start();

                // Waith for the threads to complete 
                t_admin.Join();
                t_producer.Join();
                t_consumer.Join();

                Console.WriteLine("\nPress any key to complete the program.\n");
                Console.ReadKey(false);
            }
        }
    }

Upvotes: 0

Lokesh
Lokesh

Reputation: 7940

Your implementation has a major flaw. Modifications of variable i are not thread safe which means you can get unexplainable results. It can lead to race condition.

Upvotes: 0

AllTooSir
AllTooSir

Reputation: 49402

A minor modification in your run() for Consumer:

public void run() {
    Integer i = b.get(this);
    while (i != null && i!=10) {
        System.out.println(nr + " I received : " + i);
        i = b.get(this);
    }
    System.out.println("Consumer " + nr + " is dead");
}

Upvotes: 0

JB Nizet
JB Nizet

Reputation: 691845

When the buffer is empty, the consumer starts waiting. When it's notified, it checks if the buffer is empty, and starts waiting again if it's still empty. You should not start waiting again if the running flag has been set to false:

while (i == 0 && running) {
    ...

Upvotes: 2

Related Questions