clicky
clicky

Reputation: 865

Thread Safe Publisher

I came across a Java concurrency problem which I thought I had solved but now I feel that My solution has a problem.

The problem:

Implement the missing methods in a thread safe and performant way. The system should subscribe only on the first request per T key and should unsubscribe once no more listeners remain for a given key.

interface Listener {
    void onData();
}

abstract class Publisher<T> {       
    public void subscribe(T key, Listener l) {
        // TODO complete
    }

    public void unsubscribe(T key, Listener l) {
        // TODO complete
    }

    public void publish(T key) {
        // TODO complete
    }

    public abstract void reallyLongSubscribeRequest(T key);
    public abstract void reallyLongUnsubscribeRequest(T key);
}

My solution was to store the the key's and listeners in a ConcurrentHashMap and use a thread pool executor to run a call to the really long subscribe and unsubcribe methods:

abstract class Publisher<T> {
    private final int POOL_SIZE = 5;

    private final ConcurrentHashMap<T, List<Listener>> listeners = new ConcurrentHashMap<>();
    private final ScheduledExecutorService stpe = Executors.newScheduledThreadPool(POOL_SIZE);

    public void subscribe(T key, Listener l) {
        if (listeners.containsKey(key)) {
            listeners.get(key).add(l);
        } else {
            final T keyToAdd = key;
            final List<Listener> list = new LinkedList<Listener>();
            list.add(l);

            Runnable r = new Runnable() {
                public void run() {
                    reallyLongSubscribeRequest(keyToAdd);
                    listeners.putIfAbsent(keyToAdd, list);
                }
            };

            stpe.execute(r);
        }
    }

    public void unsubscribe(T key, Listener l) {
        if (listeners.containsKey(key)) {
            List<Listener> list = listeners.get(key);
            list.remove(l);

            if (list.size() == 0) {
                final T keyToRemove = key;
                Runnable r = new Runnable() {
                    public void run() {
                        reallyLongUnsubscribeRequest(keyToRemove);
                    }
                };
                stpe.execute(r);
            }
        }
    }

    public void publish(T key) {
        if (listeners.containsKey(key)) {
            final List<Listener> list = listeners.get(key);
            for (Listener l : list) {
                l.onData();
            }
        }
    }

    public abstract void reallyLongSubscribeRequest(T key);
    public abstract void reallyLongUnsubscribeRequest(T key);
}

I'm now concerned that this is no longer thread safe because

  1. In subscribe the active thread could be swapped out/have its timeslice ended between entering the false branch and executing the Runnable. If the next thread makes the same call (same key) then we will have two threads wanting to subscribe and write to the map. putIfAbsent keeps the map consistent but the really long method will get called twice (this is bad if it changes the class state).

  2. Similar to #1, in unssubscribe what if the thread is swapped out between entering the true branch of the nested if and executing the Runnable?

So my questions are

  1. Are my above concerns valid or am I over complicating the matter (or have I misunderstood how time slicing works)?
  2. If yes, can this be easily fixed or there a much better/easier/simpler solution?

Upvotes: 2

Views: 209

Answers (2)

Jamie Cockburn
Jamie Cockburn

Reputation: 7555

You've got two problems:

  • Your subscribe, unsubscribe and publish methods should be synchronized to make them thread safe.

  • You should have have just a single thread to do the reallyLong...() calls that waits on a Queue. You post to the Queue a message telling is to do one or the other, and it does. The queue will ensure that they happen one after the other.

You also have a bug in your code. You only do the reallyLongSubscribeRequest(...) when the key doesn't exist in the map, but you are not removing the key from the map when you remove the last listener.

Upvotes: 2

jtahlborn
jtahlborn

Reputation: 53694

You have a couple of problems. Your lists are not thread-safe, and you are correct, you could run a request multiple times. The ConcurrentHashMap is a nice way to get parallel but thread-safe map access. however, you need to implement some sort of "per-key" synchronization to ensure that the (un)subscribe operations happen correctly (not to mention the list updates).

Upvotes: 1

Related Questions