Reputation: 865
I came across a Java concurrency problem which I thought I had solved but now I feel that My solution has a problem.
The problem:
Implement the missing methods in a thread safe and performant way. The system should subscribe only on the first request per T key
and should unsubscribe once no more listeners remain for a given key.
interface Listener {
void onData();
}
abstract class Publisher<T> {
public void subscribe(T key, Listener l) {
// TODO complete
}
public void unsubscribe(T key, Listener l) {
// TODO complete
}
public void publish(T key) {
// TODO complete
}
public abstract void reallyLongSubscribeRequest(T key);
public abstract void reallyLongUnsubscribeRequest(T key);
}
My solution was to store the the key's and listeners in a ConcurrentHashMap
and use a thread pool executor to run a call to the really long subscribe and unsubcribe methods:
abstract class Publisher<T> {
private final int POOL_SIZE = 5;
private final ConcurrentHashMap<T, List<Listener>> listeners = new ConcurrentHashMap<>();
private final ScheduledExecutorService stpe = Executors.newScheduledThreadPool(POOL_SIZE);
public void subscribe(T key, Listener l) {
if (listeners.containsKey(key)) {
listeners.get(key).add(l);
} else {
final T keyToAdd = key;
final List<Listener> list = new LinkedList<Listener>();
list.add(l);
Runnable r = new Runnable() {
public void run() {
reallyLongSubscribeRequest(keyToAdd);
listeners.putIfAbsent(keyToAdd, list);
}
};
stpe.execute(r);
}
}
public void unsubscribe(T key, Listener l) {
if (listeners.containsKey(key)) {
List<Listener> list = listeners.get(key);
list.remove(l);
if (list.size() == 0) {
final T keyToRemove = key;
Runnable r = new Runnable() {
public void run() {
reallyLongUnsubscribeRequest(keyToRemove);
}
};
stpe.execute(r);
}
}
}
public void publish(T key) {
if (listeners.containsKey(key)) {
final List<Listener> list = listeners.get(key);
for (Listener l : list) {
l.onData();
}
}
}
public abstract void reallyLongSubscribeRequest(T key);
public abstract void reallyLongUnsubscribeRequest(T key);
}
I'm now concerned that this is no longer thread safe because
In subscribe the active thread could be swapped out/have its timeslice ended between entering the false branch and executing the Runnable. If the next thread makes the same call (same key) then we will have two threads wanting to subscribe and write to the map. putIfAbsent
keeps the map consistent but the really long method will get called twice (this is bad if it changes the class state).
Similar to #1, in unssubscribe what if the thread is swapped out between entering the true branch of the nested if and executing the Runnable?
So my questions are
Upvotes: 2
Views: 209
Reputation: 7555
You've got two problems:
Your subscribe
, unsubscribe
and publish
methods should be synchronized
to make them thread safe.
You should have have just a single thread to do the reallyLong...()
calls that waits on a Queue
. You post to the Queue
a message telling is to do one or the other, and it does. The queue will ensure that they happen one after the other.
You also have a bug in your code. You only do the reallyLongSubscribeRequest(...)
when the key doesn't exist in the map, but you are not removing the key from the map when you remove the last listener.
Upvotes: 2
Reputation: 53694
You have a couple of problems. Your lists are not thread-safe, and you are correct, you could run a request multiple times. The ConcurrentHashMap is a nice way to get parallel but thread-safe map access. however, you need to implement some sort of "per-key" synchronization to ensure that the (un)subscribe operations happen correctly (not to mention the list updates).
Upvotes: 1