user2434
user2434

Reputation: 6409

Implementing producer consumer in Java

This is an implementation of producer consumer pattern for a homework. What's wrong with the below implementation. I have googled for various implementations, but I am not able to understand what went wrong in mine.

I have a shared queue

I synchronize the producer and consumer on the same lock

Implementation

Shared Queue:

 class SharedQueue{
    public static Queue<Integer>   queue  = new LinkedList<Integer>();
 }

Producer Thread :

//The producer thread
class Producer implements Runnable{
    public void run()
    {
        synchronized (SharedQueue.queue)
        {
            if(SharedQueue.queue.size() >=5)
            {
                try {
                    SharedQueue.queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Random r  = new Random();

            int x = r.nextInt(10);
            System.out.println("Inside Producer" + x);

            SharedQueue.queue.offer(x);


            SharedQueue.queue.notify();

        }
    }
}

Consumer Thread:

class Consumer implements  Runnable{
    public void run()
    {
        synchronized (SharedQueue.queue)
        {
            if(SharedQueue.queue.size() == 0)
            {
                try {
                    SharedQueue.queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
            }

            int k = SharedQueue.queue.remove();

            System.out.println("Inside consumer" + k);
        }
    }
}

The Main program

public class ProducerConsumerTest {

    public static void main(String[] args)
    {
        
        Thread p = new Thread(new Producer());
        Thread q = new Thread(new Consumer());

        p.start();
        q.start();

    }
}

Upvotes: 2

Views: 4391

Answers (6)

Eldar
Eldar

Reputation: 5237

Simply use my pattern with poison pills:

public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}

Upvotes: 0

Ashish Yadav
Ashish Yadav

Reputation: 1

public class ProducerAndConsumer {
    public static void main(String a[]) {
        Resource resource = new Resource();
        Producer producer = new Producer(resource);
        Consumer consumer = new Consumer(resource);
        producer.start();
        consumer.start();

    }
}

class Resource {
    private int item = 0;
    boolean flag = true;

    public void getItem() {
        while (true) {
            synchronized (this) {
                if (!flag) {
                    try {
                        System.out.println("Consumer consume item :" + item);
                        flag = true;
                        Thread.sleep(10);
                        notify();
                        wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }

    }

    public void setItem() {
        while (true) {
            synchronized (this) {

                if (flag) {
                    try {
                        item++;
                        System.out.println("Producer creating item :" + item);
                        flag = false;
                        Thread.sleep(10);
                        notify();
                        wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }
        }
    }

}

class Producer extends Thread {
    Resource resource = null;

    Producer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        resource.setItem();
    }
}

class Consumer extends Thread {
    Resource resource = null;

    Consumer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        resource.getItem();
    }
}

Upvotes: 0

Sendil
Sendil

Reputation: 9

You can use ConcurrentLinkedQueue to manage shared queue for the Producer and Consumer. You can make use of ConcurrentHashMap> collection, which will help Producer to produce concurrently and also Consumer can consume concurrently and keep the generated keys by the Producer in another collection object, where Consumer can find its key and consume it from the ConcurrentHashMap>.

Upvotes: 0

lucky
lucky

Reputation: 164

Easy way to implement Producer Consumer Problem is by using semaphore.

public class Semaphore {
    int value;

    public Semaphore(int intialValue) {
        this.value = intialValue;
    }

    public synchronized void p() {
        while (value <= 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
            }
        }
        value = value - 1;
    }

    public synchronized void v() {
        value = value + 1;
        this.notify();
    }
}

public class ProducerConsumerUsingSemaphore {

    private static final int SIZE = 10;

    public static void main(String[] args) {

        Semaphore full = new Semaphore(0);
        Semaphore empty = new Semaphore(SIZE);
        Semaphore mutex = new Semaphore(1);
        Vector<Integer> sQueue = new Vector<Integer>();

        Thread producerThread = new Thread(new Runnable() {

            @Override
            public void run() {

                for (int i = 0; i < 5000; i++) {
                    empty.p();
                    mutex.p();
                    System.out.println(Thread.currentThread().getName() + " is trying to insert item " + i);
                    sQueue.add(i);
                    mutex.v();
                    full.v();
                }
            }
        });

        Thread consumerThread = new Thread(new Runnable() {

            @Override
            public void run() {
                while (true) {
                    full.p();
                    mutex.p();
                    System.out.println(Thread.currentThread().getName() + " consuming item " + sQueue.remove(0));
                    mutex.v();
                    empty.v();
                }
            }
        });

        producerThread.setName("Producer");
        consumerThread.setName("Consumer");

        consumerThread.start();
        producerThread.start();

    }
}

Upvotes: 1

Sean
Sean

Reputation: 311

I am assuming that you want this to be a endless loop of producer consumer. On top of Eng.Fouad changes, suround both synchonized blocks with:

        while (true)

and in the Consumer add a notify

        int k = SharedQueue.queue.remove(); 

        // make the producer active again
        SharedQueue.queue.notify();

        System.out.println("Inside consumer " + k);

Upvotes: 1

Eng.Fouad
Eng.Fouad

Reputation: 117685

Try replacing:

if(SharedQueue.queue.size() >= 5)

with:

while(SharedQueue.queue.size() >= 5)

and this:

if(SharedQueue.queue.size() == 0)

with:

while(SharedQueue.queue.size() == 0)

Just to re-check the condition after calling notify().

Upvotes: 4

Related Questions