Reputation: 18068
I have a standard producer consumer problem. Producer puts data into the stack(buffer) consumers take it. The problem is that consumers are not dying(not always) when the producer ends producing the data.
I would like to make consumer die after producer ends his for
loop
for(int i = 0; i < 10; i++){
try{
// sleep((int)(Math.random() * 1));
}catch(Exception e){e.printStackTrace();}
b.put((int) (Math.random()* 10));
System.out.println("i = " + i);
}
b.stop();
so then I call b.stop()
which changes running
field in Buffer
to false and notifiesAll()
End then I get:
i = 9 // number of iteration this is 10th iteration
Consumer 2.: no data to take. I wait. Memory: 0
Consumer 1.: no data to take. I wait. Memory: 0
Consumer 3.: no data to take. I wait. Memory: 0
they should die then, so I made method stop() but it did not work.
Code is running please check it
import java.util.Stack;
public class Buffer {
private static int SIZE = 4;
private int i;//number of elements in buffer
public Stack<Integer> stack;
private volatile boolean running;
public Buffer() {
stack = new Stack<>();
running = true;
i = 0;
}
synchronized public void put(int val){
while (i >= SIZE) {
try {
System.out.println("Buffer full, producer waits");
wait();
} catch (InterruptedException exc) {
exc.printStackTrace();
}
}
stack.push(val);//txt = s;
i++;
System.out.println("Producer inserted " + val + " memory: " + i);
if(i - 1 == 0)
notifyAll();
System.out.println(stack);
}
public synchronized Integer get(Consumer c) {
while (i == 0) {
try {
System.out.println(c + ": no data to take. I wait. Memory: " + i);
wait();
} catch (InterruptedException exc) {
exc.printStackTrace();
}
}
if(running){
int data = stack.pop();
i--;
System.out.println(c+ ": I took: " + data +" memory: " + i);
System.out.println(stack);
if(i + 1 == SIZE){//if the buffer was full so the producer is waiting
notifyAll();
System.out.println(c + "I notified producer about it");
}
return data;}
else
return null;
}
public boolean isEmpty(){
return i == 0;
}
public synchronized void stop(){//I THOUGH THIS WOULD FIX IT~!!!!!!!!!!!!!!
running = false;
notifyAll();
}
public boolean isRunning(){
return running;
}
}
public class Producer extends Thread {
private Buffer b;
public Producer(Buffer b) {
this.b = b;
}
public void run(){
for(int i = 0; i < 10; i++){
try{
// sleep((int)(Math.random() * 1));
}catch(Exception e){e.printStackTrace();}
b.put((int) (Math.random()* 10));
System.out.println("i = " + i);
}
b.stop();
}
}
public class Consumer extends Thread {
Buffer b;
int nr;
static int NR = 0;
public Consumer(Buffer b) {
this.b = b;
nr = ++NR;
}
public void run() {
Integer i = b.get(this);
while (i != null) {
System.out.println(nr + " I received : " + i);
i = b.get(this);
}
System.out.println("Consumer " + nr + " is dead");
}
public String toString() {
return "Consumer " + nr + ".";
}
}
public class Main {
public static void main(String[] args) {
Buffer b = new Buffer();
Producer p = new Producer(b);
Consumer c1 = new Consumer(b);
Consumer c2 = new Consumer(b);
Consumer c3 = new Consumer(b);
p.start();
c1.start();c2.start();c3.start();
}
}
Upvotes: 1
Views: 194
Reputation: 531
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ThreadsConsolApp
{
public sealed class ProducerConsumer
{
const int MagicNumber = 30; // Indicates how many times to bounce between ping and pong threads
private Object m_lock = new Object(); // Lock to protect counter increment
private Queue<int> m_queue = new Queue<int>();
// Ctor
public ProducerConsumer()
{
}
public void admin()
{
int i = 0;
lock (m_lock)
{
while (i <= MagicNumber)
{
Monitor.Wait(m_lock); //wait
Console.WriteLine("Controller = Produced " + i + " , Consumeed " + i);
Monitor.Pulse(m_lock); //release
i++;
}
}
}
// Ping
public void Producer()
{
int counter = 0;
lock (m_lock) // Allows only one thread at a time inside m_lock
{
while (counter <= MagicNumber)
{
Thread.Sleep(500); // Get data chunks from some source
Monitor.Wait(m_lock); // Wait if the thread is busy. 'wait' will hold
//this loop until something else pulses it to release the wait.
Console.WriteLine("producer {0}", counter);
m_queue.Enqueue(counter);//entring in queue
Monitor.Pulse(m_lock); // Releases consumer thread
counter++;
}
}
}
public void Consumer()
{
lock (m_lock) // Allows only one thread at a time inside m_lock
{
Monitor.Pulse(m_lock);
while (Monitor.Wait(m_lock,1000)) // Wait in the loop while producer is busy. Exit when producer times-out. 1000 = 1 second; ...
//app will hang without this time-out value
{
int data = m_queue.Dequeue();//dispatch from queue
Console.WriteLine("consumer {0}", data);
Monitor.Pulse(m_lock); // Release consumer
Console.WriteLine("=====================");
}
}
}
}
class Program
{
static void Main(string[] args)
{
ProducerConsumer app = new ProducerConsumer();
// Create 2 threads
Thread t_producer = new Thread(new ThreadStart(app.Producer));
Thread t_consumer = new Thread(new ThreadStart(app.Consumer));
Thread t_admin = new Thread(new ThreadStart(app.admin));
// Start threads
t_admin.Start();
t_producer.Start();
t_consumer.Start();
// Waith for the threads to complete
t_admin.Join();
t_producer.Join();
t_consumer.Join();
Console.WriteLine("\nPress any key to complete the program.\n");
Console.ReadKey(false);
}
}
}
Upvotes: 0
Reputation: 7940
Your implementation has a major flaw. Modifications of variable i are not thread safe which means you can get unexplainable results. It can lead to race condition.
Upvotes: 0
Reputation: 49402
A minor modification in your run()
for Consumer
:
public void run() {
Integer i = b.get(this);
while (i != null && i!=10) {
System.out.println(nr + " I received : " + i);
i = b.get(this);
}
System.out.println("Consumer " + nr + " is dead");
}
Upvotes: 0
Reputation: 691845
When the buffer is empty, the consumer starts waiting. When it's notified, it checks if the buffer is empty, and starts waiting again if it's still empty. You should not start waiting again if the running
flag has been set to false:
while (i == 0 && running) {
...
Upvotes: 2