hgus1294
hgus1294

Reputation: 757

Java Concurrency in put/get in collections

I connect to an external service with an interactive session + a private feed (InputStream) that run on separate threads. On the interactive session, I send outgoing messages and receive synchronous responses with an object containing different fields, one being an ID and a 'status' confirming success or failure. Simultaneously I receive messages on the private feed for this ID with further 'status' updates. I currently store information about the status per ID in a ConcurrentHashMap. It is imperative that I keep a correct sequence of events on these objects but I am currently getting race conditions where I sometimes process and update the objects on the private feed before I receive and process the synchronous response on the interactive session, hence leaving me with an obsolete and incorrect status for the ID.

Ideally, I would have liked to have some type of collection with a PutIfKeyExistOrWait (w timeout) method, that would only update the value if the key exists or else wait, that I could use when processing objects on the private feed.

Does anyone know if there is a suitable collection available or can suggest an alternative solution to my problem? Thanks.

Upvotes: 4

Views: 306

Answers (3)

toto2
toto2

Reputation: 5326

You already have some ConcurrentHashMap iDAndStatus that stores the ID and latest status. However, I would only let the thread that deals with the service create a new entry in that map.

When a message arrives from the feed, if the ID already exists in iDAndStatus, it just modifies the status. If the key does not exist, just store temporarily the ID/status updates in some other data structure, pendingFeedUpdates.

Everytime a new entry is created in iDAndStatus, check pendingFeedUpdates to see if some update(s) for the new ID are present.

I'm not sure what synchronized data structure to use for pendingFeedUpdates: you need to retrieve by ID, but you might have many messages for each ID, and you want to keep the order of the messages. Maybe a synchronized HashMap that associates each ID with some type of synchronized ordered Queue?

Upvotes: 0

axtavt
axtavt

Reputation: 242716

You can try to encapsulate logic for handling this situation into values of your map, something like this:

  • If feed thread is the first to add a value for particular id, that value is considered incomplete and thread waits until it's completed
  • If interactive session thread isn't the first to add a value, it marks that incomplete value as complete
  • Incomplete values are treated as absent when getting them from the map

This solution is based on atomicity of putIfAbsent().

public class StatusMap {
    private Map<Long, StatusHolder> map = new ConcurrentHashMap<Long, StatusHolder>();

    public Status getStatus(long id) {
        StatusHolder holder = map.get(id);
        if (holder == null || holder.isIncomplete()) {
            return null;
        } else {
            return holder.getStatus();
        }
    }

    public void newStatusFromInteractiveSession(long id, Status status) {
        StatusHolder holder = StatusHolder.newComplete(status);
        if ((holder = map.putIfAbsent(id, holder)) != null) {
            holder.makeComplete(status); // Holder already exists, complete it
        } 
    }

    public void newStatusFromFeed(long id, Status status) {
        StatusHolder incomplete = StatusHolder.newIncomplete();
        StatusHolder holder = null;
        if ((holder = map.putIfAbsent(id, incomplete)) == null) {
            holder = incomplete; // New holder added, wait for its completion
            holder.waitForCompletion();
        }
        holder.updateStatus(status);
    }
}

public class StatusHolder {
    private volatile Status status;
    private volatile boolean incomplete;
    private Object lock = new Object();

    private StatusHolder(Status status, boolean incomplete) { ... }

    public static StatusHolder newComplete(Status status) {
        return new StatusHolder(status, false);
    }

    public static StatusHolder newIncomplete() {
        return new StatusHolder(null, true);
    }

    public boolean isIncomplete() { return incomplete; }

    public void makeComplete(Status status) {
        synchronized (lock) {
            this.status = status;
            incomplete = false;
            lock.notifyAll();
        }
    }

    public void waitForCompletion() {
        synchronized (lock) {
            while (incomplete) lock.wait();
        }
    }
    ...
}

Upvotes: 2

Stainedart
Stainedart

Reputation: 1979

I would suggest you look at the Collections.getSynchronized collection:http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Collections.html#synchronizedList%28java.util.List%29

This could maybe solve you problem the other option depending how the calls are made have the method be a synchronized method that allows for thread safe execution and would ensure atomicity of transaction. See http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html

The third option is to enforce a concurrency management control within the application following an optimistic or pessimistic approach depending on what you are trying to achieve. This is the most complex of the 3 but will give you the greater control if coupled with the previous options.

This is really dependent on your specific implementation.

Upvotes: 0

Related Questions