Jiew Meng
Jiew Meng

Reputation: 88197

Check before taking from a Java BlockingQueue

The problem I have is something like a single producer multiple consumer problem. Except the consumer are "different" and I need a way of "peek"ing on the new item (to check who its for) before taking it.

The problem is actually a single server thread that serves multiple client threads. Client will request info, then server should reply to this client

How can I do that? A possibility is a loop like:

while (true) {
    if (q.peek() ... check here ...) {
        // do something
    } else {
        Sleep(...); // prevent taking up too much CPU?
    }
}

But doesn't seem ideal/right?

Upvotes: 0

Views: 934

Answers (3)

Evgeniy Dorofeev
Evgeniy Dorofeev

Reputation: 136012

Maybe it's better not to use BlockingQueue at all:

public class Synchronizer {
    Object obj;

    synchronized void put(Object obj) throws InterruptedException {
        this.obj = obj;
        notifyAll();
        while(obj != null) {
            wait();
        }
    }

    synchronized Object take() throws InterruptedException {
        for(;;) {
            wait();
            if (obj instanceof MyObject) {
                notifyAll();
                Object tmp = obj;
                obj = null;
                return tmp;
            }
        }
    }
}

though there may be a version of a Synchronzer that consumes from a BlockingQueue

Upvotes: 0

Cheok Yan Cheng
Cheok Yan Cheng

Reputation: 42700

You may need to make the following operations atomic, by synchronizing them in a single block.

// Make sure queue is final. If not, use a final Object monitor.
synchronized(queue) {
    queue.peek
    queue.pool
}

This is to ensure after your consumer thread discover the queue contains the right targeted message (through BlockingQueue.peek), it can consume the message (through BlockingQueue.pool)

Still, your solution is not ideal. You are implementing a technique named busy pooling, which can waste a lot of CPU resource. You do not take advantage of BlockingQueue.pool. BlockingQueue.pool will put your consumer threads in wait stage, till the data is available.

To take full advantage of BlockingQueue.pool, each consumer threads should hold their very own queue. So that they can just call pool without having to perform busy peek.

Upvotes: 0

Brigham
Brigham

Reputation: 14524

Here are two options:

Option 1. Have one consumer of the queue that takes items off and delegates them to the "real" consumers. This one will require a lot of work on your part, because your main consumer will have to know which of the "real" consumers are busy. In addition, if one of the "real" consumers takes more resources than the others, it has the potential to block the queue while the main consumer waits to hand off the message.

The better solution is:

Option 2. Use one queue for each type of consumer. Your producer will determine which queue each message type belongs to and drop it into the correct queue. Then each consumer will pull from the queue it's interested.

Upvotes: 2

Related Questions