Reputation: 757
I connect to an external service with an interactive session + a private feed (InputStream) that run on separate threads. On the interactive session, I send outgoing messages and receive synchronous responses with an object containing different fields, one being an ID and a 'status' confirming success or failure. Simultaneously I receive messages on the private feed for this ID with further 'status' updates. I currently store information about the status per ID in a ConcurrentHashMap. It is imperative that I keep a correct sequence of events on these objects but I am currently getting race conditions where I sometimes process and update the objects on the private feed before I receive and process the synchronous response on the interactive session, hence leaving me with an obsolete and incorrect status for the ID.
Ideally, I would have liked to have some type of collection with a PutIfKeyExistOrWait (w timeout) method, that would only update the value if the key exists or else wait, that I could use when processing objects on the private feed.
Does anyone know if there is a suitable collection available or can suggest an alternative solution to my problem? Thanks.
Upvotes: 4
Views: 306
Reputation: 5326
You already have some ConcurrentHashMap iDAndStatus
that stores the ID and latest status. However, I would only let the thread that deals with the service create a new entry in that map.
When a message arrives from the feed, if the ID already exists in iDAndStatus
, it just modifies the status. If the key does not exist, just store temporarily the ID/status updates in some other data structure, pendingFeedUpdates
.
Everytime a new entry is created in iDAndStatus
, check pendingFeedUpdates
to see if some update(s) for the new ID are present.
I'm not sure what synchronized data structure to use for pendingFeedUpdates
: you need to retrieve by ID, but you might have many messages for each ID, and you want to keep the order of the messages. Maybe a synchronized HashMap that associates each ID with some type of synchronized ordered Queue?
Upvotes: 0
Reputation: 242716
You can try to encapsulate logic for handling this situation into values of your map, something like this:
This solution is based on atomicity of putIfAbsent()
.
public class StatusMap {
private Map<Long, StatusHolder> map = new ConcurrentHashMap<Long, StatusHolder>();
public Status getStatus(long id) {
StatusHolder holder = map.get(id);
if (holder == null || holder.isIncomplete()) {
return null;
} else {
return holder.getStatus();
}
}
public void newStatusFromInteractiveSession(long id, Status status) {
StatusHolder holder = StatusHolder.newComplete(status);
if ((holder = map.putIfAbsent(id, holder)) != null) {
holder.makeComplete(status); // Holder already exists, complete it
}
}
public void newStatusFromFeed(long id, Status status) {
StatusHolder incomplete = StatusHolder.newIncomplete();
StatusHolder holder = null;
if ((holder = map.putIfAbsent(id, incomplete)) == null) {
holder = incomplete; // New holder added, wait for its completion
holder.waitForCompletion();
}
holder.updateStatus(status);
}
}
public class StatusHolder {
private volatile Status status;
private volatile boolean incomplete;
private Object lock = new Object();
private StatusHolder(Status status, boolean incomplete) { ... }
public static StatusHolder newComplete(Status status) {
return new StatusHolder(status, false);
}
public static StatusHolder newIncomplete() {
return new StatusHolder(null, true);
}
public boolean isIncomplete() { return incomplete; }
public void makeComplete(Status status) {
synchronized (lock) {
this.status = status;
incomplete = false;
lock.notifyAll();
}
}
public void waitForCompletion() {
synchronized (lock) {
while (incomplete) lock.wait();
}
}
...
}
Upvotes: 2
Reputation: 1979
I would suggest you look at the Collections.getSynchronized collection:http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Collections.html#synchronizedList%28java.util.List%29
This could maybe solve you problem the other option depending how the calls are made have the method be a synchronized method that allows for thread safe execution and would ensure atomicity of transaction. See http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
The third option is to enforce a concurrency management control within the application following an optimistic or pessimistic approach depending on what you are trying to achieve. This is the most complex of the 3 but will give you the greater control if coupled with the previous options.
This is really dependent on your specific implementation.
Upvotes: 0