D Levant
D Levant

Reputation: 116

Adding Functionality to ArrayBlockingQueue

I'm trying to add functionality to ArrayBlockingQueue, specifically I want the queue to only keep unique elements, i.e not enqueuing an entry if it is already contained in the queue. Since the functionality I desired is the same as the extension of Vector in item 4.4 of JCIP, I tried implementing it using the approaches there.

My question is, how is it possible to achieve this functionality without rewriting ArrayBlockingQueue?

Upvotes: 3

Views: 1047

Answers (2)

D Levant
D Levant

Reputation: 116

I found a partial answer to my question. The offer operation is not atomic as I wanted, however the queue is distinct.

public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
    private final BlockingQueue<E> backingQueue;
    private final Set<E> entriesSet = ConcurrentHashMap.newKeySet();

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
        this.backingQueue = backingQueue;
        entriesSet.addAll(backingQueue);
    }

    @Override
    public boolean offer(E e) {
        if (!entriesSet.add(e))
            return false;

        boolean added = backingQueue.offer(e);
        if (!added) {
            entriesSet.remove(e);
        }

        return added;
    }

    @Override
    public E take() throws InterruptedException {
        E e = backingQueue.take();
        entriesSet.remove(e);

        return e;
    }

    // Other methods...
}

The additional Set is not an issue, since I would want to use one anyway in order to have reasonable performance.

However, I can think of one issue with this implementation, if it's used in conjunction with a bounded queue implementation (such as ArrayBlockingQueue), the set will not be bounded, thus the set can grow very large when there are many offers blocked.

This solution divides an operation that clearly should be atomic, so I highly suspect that there should be other issues that I'm overlooking.

Upvotes: 1

FaNaJ
FaNaJ

Reputation: 1359

Perhaps one simple and fast solution would be to use a java.util.concurrent.ConcurrentMap :

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class DistinctBlockingQueue<E> implements BlockingQueue<E> {

    private final BlockingQueue<E> backingQueue;
    private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
        this.backingQueue = backingQueue;
    }

    @Override
    public boolean offer(E e) {
        boolean[] add = {false};
        elements.computeIfAbsent(e, k -> add[0] = true);
        return add[0] && backingQueue.offer(e);
    }

    @Override
    public E take() throws InterruptedException {
        E e = backingQueue.take();
        elements.remove(e);
        return e;
    }

    // Other methods

}

Note that there's no need for synchronization.

EDIT :

The documentation at java.util.concurrent.ConcurrentHashMap says :

/**
 * If the specified key is not already associated with a value,
 * attempts to compute its value using the given mapping function
 * and enters it into this map unless {@code null}.  The entire
 * method invocation is performed atomically, so the function is
 * applied at most once per key.  Some attempted update operations
 * on this map by other threads may be blocked while computation
 * is in progress, so the computation should be short and simple,
 * and must not attempt to update any other mappings of this map.
 *
 * @param key key with which the specified value is to be associated
 * @param mappingFunction the function to compute a value
 * @return the current (existing or computed) value associated with
 *         the specified key, or null if the computed value is null
 * @throws NullPointerException if the specified key or mappingFunction
 *         is null
 * @throws IllegalStateException if the computation detectably
 *         attempts a recursive update to this map that would
 *         otherwise never complete
 * @throws RuntimeException or Error if the mappingFunction does so,
 *         in which case the mapping is left unestablished
 */
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
    ...
}

I've added some additional checks :

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class DistinctBlockingQueue<E> implements BlockingQueue<E> {

    private final BlockingQueue<E> backingQueue;
    private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
        this.backingQueue = backingQueue;
    }

    @Override
    public boolean offer(E e) {
        boolean[] add = {false};
        elements.computeIfAbsent(e, k -> add[0] = true);
        if (add[0]) {
            // make sure that the element was added to the queue,
            // otherwise we must remove it from the map
            if (backingQueue.offer(e)) {
                return true;
            }
            elements.remove(e);
        }
        return false;
    }

    @Override
    public E take() throws InterruptedException {
        E e = backingQueue.take();
        elements.remove(e);
        return e;
    }

    @Override
    public String toString() {
        return backingQueue.toString();
    }

    // Other methods

}

and... let's do some concurrency tests :

BlockingQueue<String> queue = new DistinctBlockingQueue<>(new ArrayBlockingQueue<>(100));

int n = 1000;
ExecutorService producerService = Executors.newFixedThreadPool(n);

Callable<Void> producer = () -> {
    queue.offer("a");
    return null;
};

producerService.invokeAll(IntStream.range(0, n).mapToObj(i -> producer).collect(Collectors.toList()));
producerService.shutdown();

System.out.println(queue); // prints [a]

Upvotes: 4

Related Questions