Kami
Kami

Reputation: 1109

Synchronize multiple reads and writes in a list of thread safe objects

Let's say that I have a thread safe class Container:

public class Container {
  private List<MyObject> objects;
  ...
  public synchronized MyObject take(Type t) {...} //takes an object of type t 
  public synchronized  void put(MyObject o) {...} //put an object
}

Important: MyObject has a method getType() that returns a his type

Somewhere, I have a list of containers (shared amond threads):

List<Container> containers;

And I have some threads executing something like this:

//This piece of code is not synchronized
for(Container c : containers) {
  Type t = getRandomType();
  MyObject o = c.getObject(t); //note that this is synchronized
  if(o != null) { //if I found an object of the desired type
    //do some important stuff
    return;
  }
}
waitingReaders.putMyself(); //wait for the object of the right type

I want to loop over every containers to take an object of the right type.

Then I have other threads doing something like this:

//This piece of code is not synchronized
Container c = getRandomContainer(); //from containers
Type t = getRandomType();
MyObject o = new MyObject(t); //creates an object of a random type
if(waitingReaders.containSomeoneWaitingForThisType(t)) {
  waitingReaders.givesObjectToHim(o);
  return;
}
else
  c.put(o); //note that this is synchronized

EDITED: if a reader does not find the desired object of type t, he puts himself waiting for it inside the structure waitingReaders (it does not matter his implementation). If a writer find a reader waiting for the generated object, then he gives it to the waiting reader (instead of putting in the container).

The problem is that, writer threads could put an object of type t inside a certain container after a reader thread has already analysed that container in the for loop. So the reader would miss that object if it was of the right type t (it would be already too ahead in the loop).

At the same time, I do not want to lock over the for loop because I also need to keep concurrent reads.

How would you handle the described scenario given these constraints?

Upvotes: 1

Views: 265

Answers (1)

Holger
Holger

Reputation: 298103

If I understand you correctly, you want a solution where multiple threads may store MyObject instances and multiple threads may retrieve MyObject instances of a particular type, all non-blocking with the only exception that when no MyObject instances of a requested type is available, the retrieving thread may get blocked until a new instance of the requested type is available.

Don’t try to implement a storage solution yourself. Use the existing concurrency tools:

final ConcurrentHashMap<Type, BlockingQueue<MyObject>> map=new ConcurrentHashMap<>();
/**
 * Get a object of {@code Type}, blocking if necessary
 */
MyObject getObjectOf(Type t) throws InterruptedException {
    return map.computeIfAbsent(t, x->new LinkedBlockingQueue<>()).take();
}
/**
 * Store an object, never blocking.
 */
void putObject(MyObject o) {
    map.computeIfAbsent(o.getType(), x->new LinkedBlockingQueue<>()).add(o);
}

This uses Java 8. If you don’t have Java 8, you have to emulate the computeIfAbsent operation:

final ConcurrentHashMap<Type, BlockingQueue<MyObject>> map=new ConcurrentHashMap<>();
/**
 * Get a object of {@code Type}, blocking if necessary
 */
MyObject getObjectOf(Type t) throws InterruptedException {
    return getQueue(t).take();
}
/**
 * Store an object, never blocking.
 */
void putObject(MyObject o) {
    getQueue(o.getType()).add(o);
}
private BlockingQueue<MyObject> getQueue(Type key) {
    BlockingQueue<MyObject> q=map.get(key);
    if(q!=null) return q;
    BlockingQueue<MyObject> newQueue=new LinkedBlockingQueue<>();
    q=map.putIfAbsent(key, newQueue);
    return q==null? newQueue: q;
}

This uses an unbounded LinkedBlockingQueue per Type so the only situation where a thread gets blocked is when a thread tries to retrieve an item from an empty queue.

Upvotes: 1

Related Questions