Reputation: 1330
I currently have a concurrent queue implementation that uses a BlockingQueue
as the data store. I now need to introduce a second type of object that has a higher priority, leading me towards a starvation/priority queue for the original queue. So we're working with objects of type A and type B being produced from multiple threads. Any objects of type B should be processed before those of type A, but other than that FIFO order MUST be maintained. So if { 1A, 1B, 2A, 3A, 2B } are inserted the order should be {1B, 2B, 1A, 2A, 3A}
I tried a single PriorityBlockingQueue
to push type Bs to the front, but I couldn't maintain the FIFO requirement (there's no natural order between items of the same type).
My next thought is to use two concurrent queues. I'm looking for common gotchas or considerations when coordinating access between the two queues. Ideally, I'd want to do something like this:
public void add(A a)
{
aQueue.add(a);
}
public void add(B b)
{
bQueue.add(b);
}
private void consume()
{
if(!bQueue.isEmpty())
process(bQueue.poll());
else if(!aQueue.isEmpty())
process(aQueue.poll());
}
Would I need any synchronization or locks if both queues are ConcurrentLinkedQueue
(or insert more appropriate structure here)? Note I have many producers, but only one consumer (single threaded ThreadPoolExecutor
).
EDIT: If a B comes in after the isEmpty() check, it's ok to process an A and handle it on the next consume() call.
Upvotes: 4
Views: 1864
Reputation: 1061
Based on your requirements your method here should work fine.
I would make the following change to your code. This would block on your getNextItem() until one of the queues returns an object for you.
private Object block = new Object();
public void add(A a)
{
synchronized( block )
{
aQueue.add( a );
block.notifyAll();
}
}
public void add(B b)
{
synchronized( block )
{
bQueue.add( b );
block.notifyAll();
}
}
private Object consume()
{
Object value = null
synchroinzed( block )
{
while ( return == null )
{
value = bQueue.poll();
if ( value == null ) value = aQueue.poll();
if ( value == null ) block.wait();
}
}
return value;
}
Upvotes: 3
Reputation: 26713
I am not sure if I got your situation right, but I think it should be possible to solve this using a single queue.
You said your objects (in the queue) should be comparable by natural order and type. If there is no natural order, simply have a sequence generator (i.e. AtomicLong
) that will give your objects unique, always-incrementing queue ID. Getting data from AtomicLong
should take no time, unless you're in the world of nanoseconds.
So your Comparator.compare
should be like this:
1) check object type. If it is different (A
VS B
), return 1/-1. Otherwise, see below
2) check id. It is guaranteed to be different.
If you cannot change objects (A and B), you can still wrap them into another object which would contain that ID.
Upvotes: 3
Reputation: 10359
You need to synchronize a getQueueItem() method.
With A and B implementing QueueItem Interface.
public void add(A a)
{
aQueue.add(a);
}
public void add(B b)
{
bQueue.add(b);
}
private void consume()
{
process(getNextItem());
}
private QueueItem getNextItem()
{
synchronized(bQueue) {
if(!bQueue.isEmpty()) return bQueue.poll();
return aQueue.poll();
}
}
Upvotes: 1