test123
test123

Reputation: 14185

Multi-threading: Continuous writing to shared data structure and periodically purging

I have an issue where I have multiple threads that are continuously writing to, for example a Concurrent HashMap. Now, I want to periodically process (via TimerJob) everything that is in that hashmap up to that point. Other threads can still keep writing to it (this new data will be processed when Timejob kicks off next time).

I was wondering what would be the best way to accomplish this. I was reading and this problem seems a lot like Triple Buffer. I am not super positive though about that.

Any thoughts?

Edit: I want to remove the data from the map after processing it that way I don't end up re-processing that data

Edit: I don't need to necessarily write the data to HashMap/Set. I just need to put it in a collection where I could process the collection periodically while other threads are still writing to it.

Upvotes: 0

Views: 244

Answers (2)

OldCurmudgeon
OldCurmudgeon

Reputation: 65821

I would use double buffering and a Read/Write lock.

Double buffering reduces holdups by allowing processing of the swapped-out map.

Using a Read/Write lock allows me to be certain that no-one is still writing to the map after we swap.

class DoubleBufferedMap<K, V> extends AbstractMap<K, V> implements Map<K, V> {
    // Used whenever I want to create a new map.
    private final Supplier<Map<K, V>> mapSupplier;
    // The underlying map.
    private volatile Map<K, V> map;
    // My lock - for ensuring no-one is writing.
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    final Lock readLock = readWriteLock.readLock();
    final Lock writeLock = readWriteLock.writeLock();

    public DoubleBufferedMap(Supplier<Map<K, V>> mapSupplier) {
        this.mapSupplier = mapSupplier;
        this.map = mapSupplier.get();
    }

    /**
     * Swaps out the current map with a new one.
     * 
     * @return the old map ready for processing, guaranteed to have no pending writes.
     */
    public Map<K,V> swap() {
        // Grab the existing map.
        Map<K,V> oldMap = map;
        // Replace it.
        map = mapSupplier.get();
        // Take a write lock to wait for all `put`s to complete.
        try {
            writeLock.lock();
        } finally {
            writeLock.unlock();
        }
        return oldMap;
    }

    // Put methods must take a read lock (yeah I know it's weird)

    @Nullable
    @Override
    public V put(K key, V value) {
        try{
            // Take a read-lock so they know when I'm finished.
            readLock.lock();
            return map.put(key, value);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void putAll(@NotNull Map<? extends K, ? extends V> m) {
        try{
            // Take a read-lock so they know when I'm finished.
            readLock.lock();
            map.putAll(m);
        } finally {
            readLock.unlock();
        }
    }

    @Nullable
    @Override
    public V putIfAbsent(K key, V value) {
        try{
            // Take a read-lock so they know when I'm finished.
            readLock.lock();
            return map.putIfAbsent(key, value);
        } finally {
            readLock.unlock();
        }
    }


    // All other methods are simply delegated - but you may wish to disallow some of them (like `entrySet` which would expose the map to modification without locks).

    @Override
    public Set<Entry<K, V>> entrySet() {
        return map.entrySet();
    }

    @Override
    public boolean equals(Object o) {
        return map.equals(o);
    }

    @Override
    public int hashCode() {
        return map.hashCode();
    }

    // ... The rest of the delegators (left to the student)

Upvotes: 0

HPCS
HPCS

Reputation: 1454

I am not sure if you need all the data in the map, or the data processed by the timer job you don't need anymore in the map.

If you only need somehting like snapshot for the timer job, you can switch/replace map with the new one like this.

private volatile ConcurentHashMap map ;

public void processByTimerJob(){

   ConcurentHashMap oldMap = this.map;
   this.map = new ConcurrentHashMap; // everyting new will be stored in new map    
   oldMap.forEach(.....   //process old map via iteration or whatever you want

}

Upvotes: 1

Related Questions