Reputation: 8811
I'm trying to understand the concurrency requirements for my application. Right now I'm storing in memory a Map with the recent updates for each user. The structure is the following:
Map<String, RecentUpdates> cacheMap; //Key is userId
class RecentUpdates {
public String userId; //the user
public List<EntityUpdate> recentUpdates;
}
class EntityUpdate {
public String timestamp; //The timestamp when the entity was updated
public String id; //The unique entity id
}
The following threads read/write in the map:
Single thread A:
Reads operations from a DB queue (MongoDB operation log). For each insert/update/remove operation:
Single thread B:
Iterates the cache map. If an entry has no recent updates in the last hour, deletes the entry
Multiple threads C to Z:
If the cache map contains recent updates for a given user, iterate the recent updates and retrieve those that occur later than a given timestamp.
The questions are:
1. Cache map concurrency
Which is the most appropriate data structure for the cache map? ConcurrentHashMap? Maybe a different one from Guava?
2. Recent updates list concurrency
Do I need to synchronize the recent updates List, given that only one thread adds items to it, or can I use ArrayList safely?
If I'm right, if thread A adds new elements while threads C-Z iterate the lists using Iterator, a ConcurrentModificationException will be thrown. But is it safe if I iterate the list using a for loop?
for(int i = 0; i < recentUpdates.size(); i++) {}
3. Distributed scenario
In a distributed scenario (threads C-Z in different web servers), can you recommend me a distributed cache solution (Hazelcast, Terracota...) according to my needs?
Many thanks
Upvotes: 2
Views: 496
Reputation: 13809
I made an attempt at a solution. This uses far more memory than what a solution using locks would, but provide better concurrency. Please see if this fits your needs.
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
public class CacheMap {
public static class EntityUpdate {
public final String entityId;
public final long timestamp;
public final EntityUpdate previous;
public EntityUpdate(String entityId,
long timestamp, EntityUpdate previous) {
this.entityId = entityId;
this.timestamp = timestamp;
this.previous = previous;
}
public EntityUpdate cloneChangingPrevious(EntityUpdate newPrevious) {
return new EntityUpdate(entityId, timestamp, newPrevious);
}
}
public static class RecentUpdates {
public final String userId;
public final EntityUpdate lastUpdate;
public RecentUpdates(String userId, EntityUpdate lastUpdate) {
this.userId = userId;
this.lastUpdate = lastUpdate;
}
public RecentUpdates recordUpdate(String entityId, long timestamp) {
EntityUpdate update = new EntityUpdate(entityId,
timestamp, lastUpdate);
return new RecentUpdates(userId, update);
}
public RecentUpdates removeUpdatesOlderThan(long timestamp) {
Stack<EntityUpdate> recent = new Stack<EntityUpdate>();
EntityUpdate update = lastUpdate;
while (update != null) {
if (update.timestamp >= timestamp) {
recent.push(update);
} else {
break;
}
update = update.previous;
}
EntityUpdate last = null;
while (!recent.isEmpty()) {
last = recent.pop().cloneChangingPrevious(last);
}
return new RecentUpdates(userId, last);
}
public boolean isEmpty() {
return lastUpdate == null;
}
public List<EntityUpdate> getUpdatesSince(long timestamp) {
List<EntityUpdate> list = new ArrayList<EntityUpdate>();
EntityUpdate update = lastUpdate;
while (update != null) {
if (update.timestamp >= timestamp) {
list.add(update);
} else {
break;
}
update = update.previous;
}
return Collections.unmodifiableList(list);
}
}
private final ConcurrentHashMap<String, RecentUpdates> map =
new ConcurrentHashMap<String, RecentUpdates>();
// called by thread A
public void recordUpdate(String userId, String entityId) {
boolean done = false;
while (!done) {
RecentUpdates updates = map.get(userId);
if (updates == null) {
// looks like there is no mapping for this userId,
// make an effort to insert a new one
map.putIfAbsent(userId, new RecentUpdates(userId, null));
}
// query the map again
updates = map.get(userId);
// updates could still be null, because the entry might have
// been removed from the map by now; if so, retry
if (updates != null) {
long newTimestamp = System.currentTimeMillis();
RecentUpdates newVal =
updates.recordUpdate(entityId, newTimestamp);
done = map.replace(userId, updates, newVal);
}
}
}
// called by thread B
public void removeUpdatesOlderThan(long timestamp) {
for (String userId : map.keySet()) {
boolean done = false;
while (!done) {
// updates will always be non-null,
// because only this thread can
// remove an entry from the map
RecentUpdates updates = map.get(userId);
RecentUpdates purgedUpdates =
updates.removeUpdatesOlderThan(timestamp);
if (purgedUpdates.isEmpty()) {
// remove from the map, if now new insert has
// happened in the interim
done = map.remove(userId, updates);
} else {
// replace with the purged value, if no new
// insert has happened in the interim
done = map.replace(userId, updates, purgedUpdates);
}
}
}
}
// called by threads C-Z
public List<EntityUpdate> getUpdatesSince(String userId, long timestamp) {
RecentUpdates updates = map.get(userId);
if (updates == null) {
return Collections.EMPTY_LIST;
} else {
return updates.getUpdatesSince(timestamp);
}
}
}
Upvotes: 2
Reputation: 33956
1 ConcurrentHashMap
will work, but you'll need to use putIfAbsent
, which means you'll have to construct possibly unnecessary RecentUpdate
objects (and if you're not careful, unnecessary List objects). Alternatively, use synchronized get/put:
synchronized RecentUpdates getOrCreateRecentUpdates(String key) {
RecentUpdates recentUpdates = map.get(key);
if (recentUpdates == null) {
recentUpdates = new RecentUpdates();
map.put(key, recentUpdates);
}
return recentUpdates;
}
2 Can multiple threads access the list? If yes, then you need some synchronization. ArrayList
doesn't synchronize by default. It is not safe to use .size()
. Without a memory barrier (synchronization, volatile, etc.), you can't guarantee that another thread will ever see that the list has been updated.
I have no experience with #3.
Upvotes: 3
Reputation: 4706
For 1 & 2 you could use Guava's MapMaker to construct your cache:
Map<String, RecentUpdates> cacheMap = new MapMaker().expireAfterAccess(1, TimeUnit.HOURS).makeComputingMap(new Function<String, RecentUpdates>() {
public RecentUpdates apply(String user) {
return create(user); //whatever your impl is
}
}
This map is guaranteed to call the init function once and only once for each get of a unique key (if there are concurrent gets for the same key the others block and wait).
This will work quite well as long as all accesses to RecentUpdates are done through there and you don't retain references to them.
However, you seem to want to have an ephemeral (mutable) RecentUpdates, which could have the problem that it gets updated outside this structure. You could then have updates to the RecentUpdates structure that do not reset the expiration in the cache.
A way around the above would be to have an immutable RecentUpdates that you replace in the a cache that is a ConcurrentMap
while (true) {
RecentUpdates old = map.get(key);
RecentUpdates updated = update(old); // copy
if((old == null)
? map.putIfAbsent(key, value) == null
: map.replace(key, old, value)) {
return updated;
}
}
This means the map will not have any race-conditions around expiration.
As far as 3. goes there are many considerations that go in to such a decision. Who need this cache anyway? Is it only for the current user and therefore you might consider some other type of cache in the first place.
Upvotes: 3