Reputation: 116
I'm trying to add functionality to ArrayBlockingQueue, specifically I want the queue to only keep unique elements, i.e not enqueuing an entry if it is already contained in the queue. Since the functionality I desired is the same as the extension of Vector in item 4.4 of JCIP, I tried implementing it using the approaches there.
Implementation by composition seemed like the way to go at first, producing code such as
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public synchronized boolean offer(E e) {
if (backingQueue.contains(e)) {
return false;
}
return backingQueue.offer(e);
}
@Override
public synchronized E take() throws InterruptedException {
return backingQueue.take();
}
// Other methods...
}
Unfortunately, when composing an ArrayBlockingQueue, this approach yields a deadlock in the following simple scenario:
My question is, how is it possible to achieve this functionality without rewriting ArrayBlockingQueue?
Upvotes: 3
Views: 1047
Reputation: 116
I found a partial answer to my question. The offer operation is not atomic as I wanted, however the queue is distinct.
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
private final Set<E> entriesSet = ConcurrentHashMap.newKeySet();
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
entriesSet.addAll(backingQueue);
}
@Override
public boolean offer(E e) {
if (!entriesSet.add(e))
return false;
boolean added = backingQueue.offer(e);
if (!added) {
entriesSet.remove(e);
}
return added;
}
@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
entriesSet.remove(e);
return e;
}
// Other methods...
}
The additional Set is not an issue, since I would want to use one anyway in order to have reasonable performance.
However, I can think of one issue with this implementation, if it's used in conjunction with a bounded queue implementation (such as ArrayBlockingQueue), the set will not be bounded, thus the set can grow very large when there are many offers blocked.
This solution divides an operation that clearly should be atomic, so I highly suspect that there should be other issues that I'm overlooking.
Upvotes: 1
Reputation: 1359
Perhaps one simple and fast solution would be to use a java.util.concurrent.ConcurrentMap
:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
return add[0] && backingQueue.offer(e);
}
@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}
// Other methods
}
Note that there's no need for synchronization.
EDIT :
The documentation at java.util.concurrent.ConcurrentHashMap
says :
/**
* If the specified key is not already associated with a value,
* attempts to compute its value using the given mapping function
* and enters it into this map unless {@code null}. The entire
* method invocation is performed atomically, so the function is
* applied at most once per key. Some attempted update operations
* on this map by other threads may be blocked while computation
* is in progress, so the computation should be short and simple,
* and must not attempt to update any other mappings of this map.
*
* @param key key with which the specified value is to be associated
* @param mappingFunction the function to compute a value
* @return the current (existing or computed) value associated with
* the specified key, or null if the computed value is null
* @throws NullPointerException if the specified key or mappingFunction
* is null
* @throws IllegalStateException if the computation detectably
* attempts a recursive update to this map that would
* otherwise never complete
* @throws RuntimeException or Error if the mappingFunction does so,
* in which case the mapping is left unestablished
*/
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
...
}
I've added some additional checks :
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
if (add[0]) {
// make sure that the element was added to the queue,
// otherwise we must remove it from the map
if (backingQueue.offer(e)) {
return true;
}
elements.remove(e);
}
return false;
}
@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}
@Override
public String toString() {
return backingQueue.toString();
}
// Other methods
}
and... let's do some concurrency tests :
BlockingQueue<String> queue = new DistinctBlockingQueue<>(new ArrayBlockingQueue<>(100));
int n = 1000;
ExecutorService producerService = Executors.newFixedThreadPool(n);
Callable<Void> producer = () -> {
queue.offer("a");
return null;
};
producerService.invokeAll(IntStream.range(0, n).mapToObj(i -> producer).collect(Collectors.toList()));
producerService.shutdown();
System.out.println(queue); // prints [a]
Upvotes: 4