Sagarika
Sagarika

Reputation: 95

Multithreading Synchronization not working in Java

I am trying an example of multi threading in java. There was an example on multithreading Synchronization in Java Complete reference 7th Edition. The example works fine. but when i slightly add a line to create another thread of the same class this does not work. Could some please let me know why this is happening. The example is given below. The below code is a classic exacple of producer and consumer. Where there is a single producer it works fine when i have 2 producers then it will fail. It just puts till 15 and stops.

class Q {

    int n;
    boolean valueSet = false;

    synchronized int get() {
        while (!valueSet) {
            try {
                wait();
            } catch (InterruptedException e) {
                System.out.println("InterruptedException caught");
            }
        }
        System.out.println("Got: " + n);
        valueSet = false;
        notify();
        return n;
    }

    synchronized void put(int n) {
        while (valueSet) {
            try {
                wait();
            } catch (InterruptedException e) {
                System.out.println("InterruptedException caught");
            }
        }
        this.n = n;
        valueSet = true;
        System.out.println("Put: " + n);
        notify();
    }
}

class Producer implements Runnable {

    Q q;

    Producer(Q q) {
        this.q = q;
        new Thread(this, "Producer").start();
        //new Thread(this, "Producer2").start();
    }

    public void run() {
        int i = 0;
        while (true) {
            q.put(i++);
        }
    }
}

class Consumer implements Runnable {

    Q q;

    Consumer(Q q) {
        this.q = q;
        new Thread(this, "Consumer").start();
    }

    @Override
    public void run() {
        while (true) {
            q.get();
        }
    }
}

public class PCFixed {

    public static void main(String[] args) {
        Q q = new Q();
        Producer P1 = new Producer(q);
        new Consumer(q);
        Producer P2 = new Producer(q);
        System.out.println("Press Control-C to stop.");
    }
}

Upvotes: 0

Views: 140

Answers (3)

xagyg
xagyg

Reputation: 9711

Replace each occurrence of notify with notifyAll.

Upvotes: 0

Boris the Spider
Boris the Spider

Reputation: 61128

The example of concurrency you provide uses a single boolean flag to check whether there is a signal or not.

So this is more of a Semaphore arrangement than a producer consumer arrangement. It is too simplistic to deal with an arbitrary number of Threads.

If you really want to use producer consumer you are going to need a queue that holds more than one item.

static final AtomicBoolean run = new AtomicBoolean(true);

static class Producer implements Runnable {

    final BlockingQueue<String> blockingQueue;

    public Producer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (run.get()) {
            blockingQueue.add("Value from " + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
            } catch (InterruptedException ex) {
                //doesn't matter.
            }
        }
    }
}

static class Consumer implements Runnable {

    final BlockingQueue<String> blockingQueue;

    public Consumer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (run.get()) {
            final String item;
            try {
                item = blockingQueue.take();
            } catch (InterruptedException ex) {
                return;
            }
            System.out.println(item);
        }
    }
}

public static void main(String[] args) throws InterruptedException {
    final LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<>();
    final ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(new Consumer(lbq));
    for (int i = 0; i < 10; ++i) {
        executorService.submit(new Producer(lbq));
    }
    Thread.sleep(10000);
    run.set(false);
    executorService.shutdownNow();
}

This simple example uses a LinkedBlockingQueue to post events to and read events from.

The Producer puts Strings into the queue with it's own Thread name (they do this every 100ms). The Consumer takes from the queue and prints the String.

The queue is a BlockingQueue so the take method will block if the queue is empty.

You can easily change the number of Producers and Consumers by changing the loops that add items to the ExecutorService. Experiment, see how it works.

The AtomicBoolean flag allows the program to shutdown all the child processes spawned.

Upvotes: 0

Zim-Zam O&#39;Pootertoot
Zim-Zam O&#39;Pootertoot

Reputation: 18148

Q is written to only accept one value at a time. You need to change put to be a boolean method - it returns true if valueset is true and then proceeds as normal, and returns false if valueset is false and returns without doing anything. Then the methods calling put will need to keep retrying until they get a true response. This way multiple consumers can use the same Q object without interfering with each other.

A better solution if you're using multiple producers is to use a ConcurrentLinkedQueue, which is a thread-safe queue. The producers will offer integers to the queue, and the consumers will poll the queue for integers. Multiple producers can simultaneously offer integers without interfering with each other, and multiple consumers can simultaneously poll integers without interfering with each other.

Upvotes: 1

Related Questions