Reputation: 5287
I would like to know what would be the best mechanism to implement multiple Producer - single Consumer scenario, where i have to keep the current number of unprocessed requests up to date.
My first thought was to use ConcurrentLinkedQueue:
public class SomeQueueAbstraction {
private Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
private int size;
public void add(Object request) {
SomeObject object = convertIncomingRequest(request);
concurrentQueue.add(object);
size++;
}
public SomeObject getHead() {
SomeObject object = concurrentQueue.poll();
size--;
}
// other methods
Problem with this is that i have to explicitly synchronize on add
and size ++
, as well as on the poll
and size--
, to have always accurate size
which makes ConccurentLinkedQueue
pointless to begin with.
What would be the best way to achieve as good as possible performance while maintaining data consistency ?
Should I use ArrayDequeue
instead and explicitly synchronize or there is a better way to achieve this ?
There is sort of similar question/answer here:
java.util.ConcurrentLinkedQueue
where it is discussed how composite operations on ConcurrentLinkedQueue
are naturally not atomic but there is no direct answer what is the best option for the given scenario.
Note: I am calculating size explicitly because time complexity for inherent .size() method is O(n).
Note2: I am also worried that getSize() method, which i haven't explicitly written, will add to even more contention overhead. It could be called relatively frequently.
I am looking for the most efficient way to handle Multiple Producers - single Consumer with frequent getSize() calls.
Alternative suggestion: If there was elementId in SomeObject structure, i could get current size from ConcurrentLinkedQueue.poll() and only locking would have to be done within mechanism to generate such id. Add and get could now properly be used without additional locking. How would this fare as an alternative ?
Upvotes: 1
Views: 628
Reputation: 6538
So the requirement is to report an up to date current number of unprocessed requests. And this is requested often which indeed makes ConcurrentLinkedQueue.size()
unsuitable.
This can be done using an AtomicInteger: it is fast and is always as close to the current number of unprocessed requests as possible.
Here is an example, note some small updates to ensure that the reported size is accurate:
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class SomeQueueAbstraction {
private final Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
private final AtomicInteger size = new AtomicInteger();
public boolean add(Object request) {
SomeObject object = convertIncomingRequest(request);
if (concurrentQueue.add(object)) {
size.incrementAndGet();
return true;
}
return false;
}
public SomeObject remove() {
SomeObject object = concurrentQueue.poll();
if (object != null) {
size.decrementAndGet();
}
return object;
}
public int getSize() { return size.get(); }
private SomeObject convertIncomingRequest(Object request) {
return new SomeObject(getSize());
}
class SomeObject {
int id;
SomeObject(int id) { this.id = id; }
}
}
Upvotes: 1
Reputation: 1123
You can use an explicit lock, which means you probably won't need a concurrent queue.
public class SomeQueueAbstraction {
private Queue<SomeObject> queue = new LinkedList<>();
private volatile int size;
private Object lock = new Object();
public void add(Object request) {
SomeObject object = convertIncomingRequest(request);
synchronized(lock) {
queue.add(object);
size++;
}
}
public SomeObject getHead() {
SomeObject object = null;
synchronized(lock) {
object = queue.poll();
size--;
}
return object;
}
public int getSize() {
synchronized(lock) {
return size;
}
}
// other methods
}
This way, adding/removing elements to/from the queue and updating the size
will be done safely.
Upvotes: 1