Qasim
Qasim

Reputation: 9658

Issue in Implementing Blocking Queue in java

I am implementing a thread-safe Blocking Queue in java but it appears threads are running one after another and not in parallel. Can anyone help me to find out what am I doing wrong? My code is as follows:

package com.example;

import java.util.LinkedList;
import java.util.List;

class Producer implements Runnable{

    BlockingQueue blockingQueue;

    public Producer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        int counter = 0;
        while (true)
        {
            try
            {
                blockingQueue.enqueue(counter++);
            }
            catch (InterruptedException ex)
            {
                ex.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{

    BlockingQueue blockingQueue;

    public Consumer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {


        while (true)
        {
            try
            {
                blockingQueue.dequeue();
            }
            catch (InterruptedException ex)
            {
                ex.printStackTrace();
            }
        }
    }
}

public class Test{
    public static void main(String[] args) {

        BlockingQueue blockingQueue = new BlockingQueue(10);
        Thread producer = new Thread(new Producer(blockingQueue), "Prod");
        Thread consumer = new Thread(new Consumer(blockingQueue), "Cons");
        producer.start();
        consumer.start();
    }
}

class BlockingQueue {

    private List queue = new LinkedList();
    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }


    public synchronized void enqueue(Object item)
            throws InterruptedException  {

        while(this.queue.size() == this.limit) {
            System.out.println("Wait Enque : "+Thread.currentThread().getName());
            wait();
        }

        Thread.sleep(1000);
        System.out.println("Add Item : " + Thread.currentThread().getName());
        this.queue.add(item);
        notifyAll();
    }


    public synchronized Object dequeue()
            throws InterruptedException{

        while(this.queue.size() == 0){
            System.out.println("Wait Denque : "+Thread.currentThread().getName());
            wait();
        }

        Thread.sleep(1000);
        System.out.println("Remove Item : " + Thread.currentThread().getName());
        notifyAll();
        return this.queue.remove(0);
    }

}

I am new to multithreading.

This is the output I am getting:

Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Wait Enque : Prod
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Wait Denque : Cons
Add Item : Prod
Add Item : Prod
Add Item : Prod

Upvotes: 2

Views: 346

Answers (2)

Nicolas Filotto
Nicolas Filotto

Reputation: 45005

You get this behavior because when we use synchronized blocks also called intrinsic locks, we have no guarantee which one of the waiting thread will get the lock first (the intrinsic lock is an unfair lock). To get the behavior that you expect, you need to use a fair explicit lock, this way you are sure that the first thread waiting for the lock will acquire it first. Here is how you create a fair explicit lock with its corresponding condition:

class BlockingQueue {

    private final Lock lock = new ReentrantLock(true);
    private final Condition condition = lock.newCondition();
    ...

The code of enqueue will then be:

public void enqueue(Object item)
    throws InterruptedException  {

    try {
        lock.lock();
        while(this.queue.size() == this.limit) {
            System.out.println("Wait Enque : "+Thread.currentThread().getName());
            condition.await();
        }

        Thread.sleep(1000);
        System.out.println("Add Item : " + Thread.currentThread().getName());
        this.queue.add(item);
        condition.signalAll();
    } finally {
        lock.unlock();
    }
}

And finally the code of dequeue will be:

public Object dequeue()
    throws InterruptedException{

    try {
        lock.lock();
        while(this.queue.size() == 0){
            System.out.println("Wait Denque : "+Thread.currentThread().getName());
            condition.await();
        }

        Thread.sleep(1000);
        System.out.println("Remove Item : " + Thread.currentThread().getName());
        condition.signalAll();
        return this.queue.remove(0);
    } finally {
        lock.unlock();
    }
}

The output is then:

Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons

Upvotes: 0

Kedar Mhaswade
Kedar Mhaswade

Reputation: 4695

Your implementation of BlockingQueue demonstrates the "Sleeping at Work" pattern that should be avoided.

For some reason, you decided to call Thread.sleep in your data structure's synchronized methods enqueue and dequeue. I don't think that is required at all. What those methods should do at a minimum is to use the shared mutable state (i.e. your queue) in a thread-safe manner. And everything except the call to Thread.sleep() in those methods is a good first attempt:

public synchronized void enqueue(Object item)
        throws InterruptedException {

    while (this.queue.size() == this.limit) {
        System.out.println("Wait Enque : "+Thread.currentThread().getName());
        wait();
    }

    System.out.println("Add Item : " + item.toString() + " " + Thread.currentThread().getName());
    this.queue.add(item);
    notifyAll();
}

and similarly for dequeue.

What your threads however are doing is they are being extremely greedy :-). Maybe you should actually do something with the dequeued item inside the run method of your threads:

    while (true)
    {
        try
        {
            Object deq = blockingQueue.dequeue();
            Thread.sleep(1000); // sleeping to simulate using the de-queued item

        }
        catch (InterruptedException ex)
        {
            ex.printStackTrace();
        }
    }

So, in effect, all I did was take the sleeping part out of your data structure's methods. And I get the following output which is interleaved and something that I'd expect:

Add Item : 0 Prod
Remove Item : 0 Cons
Add Item : 1 Prod
Remove Item : 1 Cons
Add Item : 2 Prod
Add Item : 3 Prod
Remove Item : 2 Cons
Add Item : 4 Prod
Add Item : 5 Prod

Of course, there are several improvements that one can suggest:

  1. Make all your fields final.
  2. Use something like CountDownLatch to ensure some fairness.

Take a look at java.util.concurrent.LinkedBlockingQueue for some insights as well!

Upvotes: 2

Related Questions