Reputation: 14185
I have an issue where I have multiple threads that are continuously writing to, for example a Concurrent HashMap. Now, I want to periodically process (via TimerJob) everything that is in that hashmap up to that point. Other threads can still keep writing to it (this new data will be processed when Timejob kicks off next time).
I was wondering what would be the best way to accomplish this. I was reading and this problem seems a lot like Triple Buffer. I am not super positive though about that.
Any thoughts?
Edit: I want to remove the data from the map after processing it that way I don't end up re-processing that data
Edit: I don't need to necessarily write the data to HashMap/Set. I just need to put it in a collection where I could process the collection periodically while other threads are still writing to it.
Upvotes: 0
Views: 244
Reputation: 65821
I would use double buffering
and a Read/Write lock.
Double buffering reduces holdups by allowing processing of the swapped-out map.
Using a Read/Write lock allows me to be certain that no-one is still writing to the map after we swap.
class DoubleBufferedMap<K, V> extends AbstractMap<K, V> implements Map<K, V> {
// Used whenever I want to create a new map.
private final Supplier<Map<K, V>> mapSupplier;
// The underlying map.
private volatile Map<K, V> map;
// My lock - for ensuring no-one is writing.
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
final Lock readLock = readWriteLock.readLock();
final Lock writeLock = readWriteLock.writeLock();
public DoubleBufferedMap(Supplier<Map<K, V>> mapSupplier) {
this.mapSupplier = mapSupplier;
this.map = mapSupplier.get();
}
/**
* Swaps out the current map with a new one.
*
* @return the old map ready for processing, guaranteed to have no pending writes.
*/
public Map<K,V> swap() {
// Grab the existing map.
Map<K,V> oldMap = map;
// Replace it.
map = mapSupplier.get();
// Take a write lock to wait for all `put`s to complete.
try {
writeLock.lock();
} finally {
writeLock.unlock();
}
return oldMap;
}
// Put methods must take a read lock (yeah I know it's weird)
@Nullable
@Override
public V put(K key, V value) {
try{
// Take a read-lock so they know when I'm finished.
readLock.lock();
return map.put(key, value);
} finally {
readLock.unlock();
}
}
@Override
public void putAll(@NotNull Map<? extends K, ? extends V> m) {
try{
// Take a read-lock so they know when I'm finished.
readLock.lock();
map.putAll(m);
} finally {
readLock.unlock();
}
}
@Nullable
@Override
public V putIfAbsent(K key, V value) {
try{
// Take a read-lock so they know when I'm finished.
readLock.lock();
return map.putIfAbsent(key, value);
} finally {
readLock.unlock();
}
}
// All other methods are simply delegated - but you may wish to disallow some of them (like `entrySet` which would expose the map to modification without locks).
@Override
public Set<Entry<K, V>> entrySet() {
return map.entrySet();
}
@Override
public boolean equals(Object o) {
return map.equals(o);
}
@Override
public int hashCode() {
return map.hashCode();
}
// ... The rest of the delegators (left to the student)
Upvotes: 0
Reputation: 1454
I am not sure if you need all the data in the map, or the data processed by the timer job you don't need anymore in the map.
If you only need somehting like snapshot for the timer job, you can switch/replace map with the new one like this.
private volatile ConcurentHashMap map ;
public void processByTimerJob(){
ConcurentHashMap oldMap = this.map;
this.map = new ConcurrentHashMap; // everyting new will be stored in new map
oldMap.forEach(..... //process old map via iteration or whatever you want
}
Upvotes: 1