Reputation: 1109
Let's say that I have a thread safe class Container:
public class Container {
private List<MyObject> objects;
...
public synchronized MyObject take(Type t) {...} //takes an object of type t
public synchronized void put(MyObject o) {...} //put an object
}
Important: MyObject has a method getType() that returns a his type
Somewhere, I have a list of containers (shared amond threads):
List<Container> containers;
And I have some threads executing something like this:
//This piece of code is not synchronized
for(Container c : containers) {
Type t = getRandomType();
MyObject o = c.getObject(t); //note that this is synchronized
if(o != null) { //if I found an object of the desired type
//do some important stuff
return;
}
}
waitingReaders.putMyself(); //wait for the object of the right type
I want to loop over every containers to take an object of the right type.
Then I have other threads doing something like this:
//This piece of code is not synchronized
Container c = getRandomContainer(); //from containers
Type t = getRandomType();
MyObject o = new MyObject(t); //creates an object of a random type
if(waitingReaders.containSomeoneWaitingForThisType(t)) {
waitingReaders.givesObjectToHim(o);
return;
}
else
c.put(o); //note that this is synchronized
EDITED: if a reader does not find the desired object of type t, he puts himself waiting for it inside the structure waitingReaders (it does not matter his implementation). If a writer find a reader waiting for the generated object, then he gives it to the waiting reader (instead of putting in the container).
The problem is that, writer threads could put an object of type t inside a certain container after a reader thread has already analysed that container in the for loop. So the reader would miss that object if it was of the right type t (it would be already too ahead in the loop).
At the same time, I do not want to lock over the for loop because I also need to keep concurrent reads.
How would you handle the described scenario given these constraints?
Upvotes: 1
Views: 265
Reputation: 298103
If I understand you correctly, you want a solution where multiple threads may store MyObject
instances and multiple threads may retrieve MyObject
instances of a particular type, all non-blocking with the only exception that when no MyObject
instances of a requested type is available, the retrieving thread may get blocked until a new instance of the requested type is available.
Don’t try to implement a storage solution yourself. Use the existing concurrency tools:
final ConcurrentHashMap<Type, BlockingQueue<MyObject>> map=new ConcurrentHashMap<>();
/**
* Get a object of {@code Type}, blocking if necessary
*/
MyObject getObjectOf(Type t) throws InterruptedException {
return map.computeIfAbsent(t, x->new LinkedBlockingQueue<>()).take();
}
/**
* Store an object, never blocking.
*/
void putObject(MyObject o) {
map.computeIfAbsent(o.getType(), x->new LinkedBlockingQueue<>()).add(o);
}
This uses Java 8. If you don’t have Java 8, you have to emulate the computeIfAbsent
operation:
final ConcurrentHashMap<Type, BlockingQueue<MyObject>> map=new ConcurrentHashMap<>();
/**
* Get a object of {@code Type}, blocking if necessary
*/
MyObject getObjectOf(Type t) throws InterruptedException {
return getQueue(t).take();
}
/**
* Store an object, never blocking.
*/
void putObject(MyObject o) {
getQueue(o.getType()).add(o);
}
private BlockingQueue<MyObject> getQueue(Type key) {
BlockingQueue<MyObject> q=map.get(key);
if(q!=null) return q;
BlockingQueue<MyObject> newQueue=new LinkedBlockingQueue<>();
q=map.putIfAbsent(key, newQueue);
return q==null? newQueue: q;
}
This uses an unbounded LinkedBlockingQueue
per Type
so the only situation where a thread gets blocked is when a thread tries to retrieve an item from an empty queue.
Upvotes: 1