Reputation: 9658
I am implementing a thread-safe Blocking Queue in java but it appears threads are running one after another and not in parallel. Can anyone help me to find out what am I doing wrong? My code is as follows:
package com.example;
import java.util.LinkedList;
import java.util.List;
class Producer implements Runnable{
BlockingQueue blockingQueue;
public Producer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
int counter = 0;
while (true)
{
try
{
blockingQueue.enqueue(counter++);
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
BlockingQueue blockingQueue;
public Consumer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true)
{
try
{
blockingQueue.dequeue();
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
}
public class Test{
public static void main(String[] args) {
BlockingQueue blockingQueue = new BlockingQueue(10);
Thread producer = new Thread(new Producer(blockingQueue), "Prod");
Thread consumer = new Thread(new Consumer(blockingQueue), "Cons");
producer.start();
consumer.start();
}
}
class BlockingQueue {
private List queue = new LinkedList();
private int limit = 10;
public BlockingQueue(int limit){
this.limit = limit;
}
public synchronized void enqueue(Object item)
throws InterruptedException {
while(this.queue.size() == this.limit) {
System.out.println("Wait Enque : "+Thread.currentThread().getName());
wait();
}
Thread.sleep(1000);
System.out.println("Add Item : " + Thread.currentThread().getName());
this.queue.add(item);
notifyAll();
}
public synchronized Object dequeue()
throws InterruptedException{
while(this.queue.size() == 0){
System.out.println("Wait Denque : "+Thread.currentThread().getName());
wait();
}
Thread.sleep(1000);
System.out.println("Remove Item : " + Thread.currentThread().getName());
notifyAll();
return this.queue.remove(0);
}
}
I am new to multithreading.
This is the output I am getting:
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Add Item : Prod
Wait Enque : Prod
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Remove Item : Cons
Wait Denque : Cons
Add Item : Prod
Add Item : Prod
Add Item : Prod
Upvotes: 2
Views: 346
Reputation: 45005
You get this behavior because when we use synchronized blocks
also called intrinsic locks
, we have no guarantee which one of the waiting thread will get the lock first (the intrinsic lock
is an unfair
lock). To get the behavior that you expect, you need to use a fair explicit lock
, this way you are sure that the first thread waiting for the lock will acquire it first. Here is how you create a fair explicit lock
with its corresponding condition
:
class BlockingQueue {
private final Lock lock = new ReentrantLock(true);
private final Condition condition = lock.newCondition();
...
The code of enqueue will then be:
public void enqueue(Object item)
throws InterruptedException {
try {
lock.lock();
while(this.queue.size() == this.limit) {
System.out.println("Wait Enque : "+Thread.currentThread().getName());
condition.await();
}
Thread.sleep(1000);
System.out.println("Add Item : " + Thread.currentThread().getName());
this.queue.add(item);
condition.signalAll();
} finally {
lock.unlock();
}
}
And finally the code of dequeue will be:
public Object dequeue()
throws InterruptedException{
try {
lock.lock();
while(this.queue.size() == 0){
System.out.println("Wait Denque : "+Thread.currentThread().getName());
condition.await();
}
Thread.sleep(1000);
System.out.println("Remove Item : " + Thread.currentThread().getName());
condition.signalAll();
return this.queue.remove(0);
} finally {
lock.unlock();
}
}
The output is then:
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Add Item : Prod
Remove Item : Cons
Upvotes: 0
Reputation: 4695
Your implementation of BlockingQueue
demonstrates the "Sleeping at Work" pattern that should be avoided.
For some reason, you decided to call Thread.sleep
in your data structure's synchronized
methods enqueue and dequeue
. I don't think that is required at all. What those methods should do at a minimum is to use the shared mutable state (i.e. your queue
) in a thread-safe manner. And everything except the call to Thread.sleep()
in those methods is a good first attempt:
public synchronized void enqueue(Object item)
throws InterruptedException {
while (this.queue.size() == this.limit) {
System.out.println("Wait Enque : "+Thread.currentThread().getName());
wait();
}
System.out.println("Add Item : " + item.toString() + " " + Thread.currentThread().getName());
this.queue.add(item);
notifyAll();
}
and similarly for dequeue
.
What your threads however are doing is they are being extremely greedy :-). Maybe you should actually do something with the dequeued item inside the run method of your threads:
while (true)
{
try
{
Object deq = blockingQueue.dequeue();
Thread.sleep(1000); // sleeping to simulate using the de-queued item
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
So, in effect, all I did was take the sleeping part out of your data structure's methods. And I get the following output which is interleaved and something that I'd expect:
Add Item : 0 Prod
Remove Item : 0 Cons
Add Item : 1 Prod
Remove Item : 1 Cons
Add Item : 2 Prod
Add Item : 3 Prod
Remove Item : 2 Cons
Add Item : 4 Prod
Add Item : 5 Prod
Of course, there are several improvements that one can suggest:
final
.CountDownLatch
to ensure some fairness.Take a look at java.util.concurrent.LinkedBlockingQueue
for some insights as well!
Upvotes: 2