SkyAo
SkyAo

Reputation: 687

Producer/Consumer Model using Java(Synchronized) but always run the same thread

Everything looks like well but when I run, the result like:

Start Thread(1): Pro2

Start Thread(2): Pro1

Start Thread(1): Con1

Start Thread(2): Con2

Pro2 Produce: EWlmJdi2KK

Left:

Pro2:EWlmJdi2KK

---Left: 1---

Con2 Consume: EWlmJdi2KK

Left:

---Left: 0---

Pro2 Produce: Nx7QPyG7vs

Left:

Pro2:Nx7QPyG7vs

---Left: 1---

Pro2 Produce: xl85Zwr80a

Left: Pro2:Nx7QPyG7vs Pro2:xl85Zwr80a

---Left: 2---

Always the same Producer & Consumer in a running time.

Here is My Code: Main.java:

package com.producer;

import org.apache.commons.lang3.RandomStringUtils;

public class Main {

    public static void main(String[] args) {
        ThreadSynchronized semaphore = new ThreadSynchronized();
        ThreadSynchronized.Producer pro = semaphore.new Producer("1");
        ThreadSynchronized.Consumer con = semaphore.new Consumer("2");
        new Thread(pro, "Pro2").start();
        new Thread(pro, "Pro1").start();
        new Thread(con, "Con1").start();
        new Thread(con, "Con2").start();

    }
}

ThreadSynchronized.java:

package com.producer;

import java.util.LinkedList;

/**
 * Created by SkyAo on 15/10/18.
 */
public class ThreadSynchronized {
    private static int pid;
    private static int cid;
    public LinkedList<Item> items = new LinkedList<>();
    private Item temp;
    private int item;
    private boolean flag = false;

    class Producer implements Runnable {
        private String name;
        private int id;

        public Producer(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            this.id = ++pid;
            System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
            while (true)
                this.produce();
        }

        private synchronized void produce() {
            try {

                Thread.sleep((int)(Math.random()*5000)+3000);
                if (items.size() < 5) {
                    items.add(new Item(this.id, Thread.currentThread().getName()));
                    temp = items.getLast();
                    System.out.println(temp.sourceName + " Produce: " + temp.semi);

                    System.out.println("Left: ");

                    for (Item item : items) {
                        System.out.println(item.sourceName + ":" + item.semi);
                    }

                    System.out.println("---Left: " + items.size() + "---");
                } else {
                    super.wait();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                super.notifyAll();
            }

        }


    }

    class Consumer implements Runnable {
        private String name;
        private int id;

        public Consumer(String name) {
            this.name = name;
            //this.id = ++cid;
        }

        @Override
        public void run() {
            this.id = ++cid;
            System.out.println("Start Thread(" + this.id + "): " + Thread.currentThread().getName());
            while (true)
                this.consume();

        }

        private synchronized void consume() {
            try {
                Thread.sleep((int) (Math.random() * 5000) + 3000);
                if (items.size() > 0) {
                    temp = items.removeFirst();
                    System.out.println(Thread.currentThread().getName() + " Consume: " + temp.semi);
                    System.out.println("Left: ");

                    for (Item item : items) {
                        System.out.println(item.sourceName + ":" + item.semi);
                    }

                    System.out.println("---Left: " + items.size() + "---");
                } else {
                    super.wait();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                super.notifyAll();
            }
        }
    }
}

I don't know what happened and how to fix it,Thanks for help.

Upvotes: 2

Views: 1367

Answers (3)

Andy Dalton
Andy Dalton

Reputation: 242

You've structured your solution in an unusual way. Typically the producer and consumer are not themselves synchronized, but access to the resource that they're sharing is synchronized.

Consider this example. Here, an instance of MyQueue is the shared resource – it is the thing that has synchronized methods. The producer and consumer themselves are not synchronized.

import java.util.LinkedList;
import java.util.List;
import java.util.Random;

class MyQueue {
    private int capacity;
    private List<Integer> queue = new LinkedList<>();

    public MyQueue(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void enqueue(int item) throws InterruptedException {
        while (queue.size() == this.capacity) {
            wait();
        }

        System.out.println("Thread " + Thread.currentThread().getName() +
                           " producing " + item);
        queue.add(item);

        if (queue.size() == 1) {
            notifyAll();
        }
    }

    public synchronized int dequeue() throws InterruptedException {
        int item;

        while (queue.size() == 0) {
            wait();
        }

        item = queue.remove(0);

        System.out.println("Thread " + Thread.currentThread().getName() +
                           " consuming " + item);

        if (queue.size() == (capacity - 1)) {
            notifyAll();
        }

        return item;
    }
}

public class ProducerConsumer {
    private static class Producer implements Runnable {
        private MyQueue queue;
        private Random random = new Random();

        public Producer(MyQueue queue) {
            this.queue = queue;
        }

        public void run() {
            try {
                for (;;) {
                    queue.enqueue(random.nextInt());
                    Thread.sleep((int)(Math.random() * 3000) + 1000);
                }
            } catch (InterruptedException ex) {
                System.out.println(Thread.currentThread().getName() +
                        " interrupted");
            }
        }
    }

    private static class Consumer implements Runnable {
        private MyQueue queue;

        public Consumer(MyQueue queue) {
            this.queue = queue;
        }

        public void run() {
            try {
                for (;;) {
                    queue.dequeue();
                    Thread.sleep((int)(Math.random() * 5000) + 3000);
                }
            } catch (InterruptedException ex) {
                System.out.println(Thread.currentThread().getName() +
                        " interrupted");
            }
        }
    }

    public static void main(String[] args) {
        MyQueue queue = new MyQueue(10);

        new Thread(new Producer(queue), "Producer 1").start();
        new Thread(new Producer(queue), "Producer 2").start();
        new Thread(new Consumer(queue), "Consumer 1").start();
        new Thread(new Consumer(queue), "Consumer 2").start();
    }
}

Sample output:

$ java ProducerConsumer
Thread Producer 1 producing 1380029295
Thread Consumer 1 consuming 1380029295
Thread Producer 2 producing 1449212482
Thread Consumer 2 consuming 1449212482
Thread Producer 2 producing -1845586946
Thread Producer 1 producing -1072820603
Thread Producer 1 producing 1224861300
Thread Producer 2 producing 507431251
Thread Consumer 2 consuming -1845586946
Thread Consumer 1 consuming -1072820603
Thread Producer 2 producing -1305630628
Thread Producer 1 producing 1413011254
Thread Producer 2 producing -222621018
Thread Consumer 2 consuming 1224861300
Thread Producer 1 producing -1628544536
Thread Consumer 1 consuming 507431251

Upvotes: 1

Roger Gustavsson
Roger Gustavsson

Reputation: 1709

It's tricky to solve Producer/Consumer problems with notify/wait. My suggestion is that you look into "queues". More specifically one of the sub classes of AbstractQueue.

You would start by creating a suitable queue. Then create Producer objects. Each producer object implements Runnable. When you create the object you pass the queue as a parameter. You start your producer objects as new threads.

You do the same thing with Consumer objects. They also implements Runnable and are passed the queue as a parameter when created. When all Producer and Consumer threads are running, your main program is done.

It's the same queue object that is passed to all Producer and Consumer threads. Synchronization is handled by the queue.

The code in the Producer object writes to, or puts entries on, the queue -- while the code in the Consumers read, or pulls entries, from the queue.

See this blog for an example.

Upvotes: 1

Tom Koptel
Tom Koptel

Reputation: 319

you are tackling a tricky design problem in Java vanilla way. This approach is error prone and not reliable. I would suggest using ready solutions instead. Though, for you case I think you will find this article to be helpful.

Upvotes: 0

Related Questions