watchzerg
watchzerg

Reputation: 662

Is there a thread-safe and element-unique queue in java?

Like a hybrid of "ConcurrentHashMap" and "ConcurrentLinkedQueue".

Here is my requirements:
I need a asynchronous updated cache system. That means I wrap every entity before set it into memcache. There is a timestamp in the warpper which indicate when its content should expire. Each request from front side would fetch data from memcache, and if the warpper shows expiration, an update event would be generated and put into a concurrentLinkedQueue, then waiting to be updated asynchronously.
The problem is: I don't want to update an entity more than once in vain. Before adding an event to the queue, I wish to find a way to make sure there is no event for the same entity in the queue already.

Is that OK if I do it in these ways?

1,Create a warpper Class, it contains a hashMap and a linkedList in it. All its method is synchronized:

public synchronized boolean add(String key,Object value){
    if(hashMap.containsKey(key)){
        return false;
    }else{
        hashMap.put(key,value);
        return linkedList.offer(value);
    }
}  

I believe this solution would be extremely slow.
Maybe It's just like Collections.synchronizedMap(new LinkedHashMap()).

2,Just use a concurrentHashMap. If I need "poll" action, iterator an element from it.

public Object poll(){
    Collection valueColl = concurrentHashMap.values();
    if(valueColl.isEmpty()){
        retrun null;
    }
    return valueColl.get(0);
}  

The action concurrentHashMap.values().get(0) is slow or not?

3,Looking into the source code of "ConcurrentHashMap" and "ConcurrentLinkedQueue", then write an "ConcurrentUniqueLinkedQueue" if possible.
This looks a little hard for me for the moment.

so, how would you guys say?

Upvotes: 4

Views: 4211

Answers (2)

Beto Gontijo
Beto Gontijo

Reputation: 1

import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;

public class ConcurrentSetQueue<E> extends LinkedHashSet<E> implements Queue<E> {

    /**
     * 
     */
    private static final long serialVersionUID = 7906542722835397462L;

    final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock();

    public E remove() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E next = iterator().next();
            remove(next);
            return next;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public E element() {
        return iterator().next();
    }

    @Override
    public boolean offer(E arg0) {
        return add(arg0);
    }

    @Override
    public E peek() {
        return element();
    }

    @Override
    public E poll() {
        return remove();
    }

}

Upvotes: 0

Peter Lawrey
Peter Lawrey

Reputation: 533432

I don't imagine you want to discard the latest updates. Perhaps you are making it more complicated than it needs to be.

public void add(K key, V value) {
    concurrentMap.put(key, value);
    queue.add(key);
}

public V poll() {
    for(K key; (key = queue.take()) != null;) {
        V value = concurrentMap.remove(key);
        if (value != null)
           return value;
        // value will be null if it's a duplicate so ignore and look for more.
    }
    return null;
}

This will give you the latest value for key in queued order. It doesn't need any locking.

Upvotes: 6

Related Questions