fizzer
fizzer

Reputation: 13806

How do I tear down observer relationship in multithreaded C++?

I have a Subject which offers Subscribe(Observer*) and Unsubscribe(Observer*) to clients. Subject runs in its own thread (from which it calls Notify() on subscribed Observers) and a mutex protects its internal list of Observers.

I would like client code - which I don't control - to be able to safely delete an Observer after it is unsubscribed. How can this be achieved?

Edit

Some illustrative code follows. The problem is how to prevent Unsubscribe happening while Run is at the 'Problem here' comment. Then I could call back on a deleted object. Alternatively, if I hold the mutex throughout rather than making the copy, I can deadlock certain clients.

#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.insert(o);
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            set<Observer*> notifyList;
            {
                mutex::scoped_lock l(m);
                notifyList = observers;
            }
            // Problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&Observer::Notify));
        }
    }

private:
    set<Observer*> observers;
    thread t;
    mutex m;
};

Edit

I can't Notify observers while holding the mutex because of the deadlock risk. The most obvious way this can happen - the client calls Subscribe or Unsubscribe from inside Notify - is easily remedied by making the mutex recursive. More insidious is the risk of intermittent deadlock on different threads.

I'm in a multithreaded environment, so at any point in a thread's execution, it will typically hold a sequence of locks L1, L2, ... Ln. Another thread will hold locks K1, K2, ... Km. A properly written client will ensure that different threads will always acquire locks in the same order. But when clients interact with my Subject's mutex - call it X - this strategy will be broken: Calls to Subscribe / Unsubscribe acquire locks in the order L1, L2, ... Ln, X. Calls to Notify from my Subject thread acquire locks in the order X, K1, K2, ... Km. If any of the Li or Kj can coincide down any call path, the client suffers an intermittent deadlock, with little prospect of debugging it. Since I don't control the client code, I can't do this.

Upvotes: 11

Views: 2670

Answers (9)

Matthieu M.
Matthieu M.

Reputation: 300199

The "ideal" solution would involve using shared_ptr and weak_ptr. However, in order to be generic, it also has to account for the issue of Subject being dropped before some of its Observer (yes, that can happen too).

class Subject {
public:
    void Subscribe(std::weak_ptr<Observer> o);
    void Unsubscribe(std::weak_ptr<Observer> o);

private:
    std::mutex mutex;
    std::set< std::weak_ptr<Observer> > observers;
};

class Observer: boost::noncopyable {
public:
    ~Observer();

    void Notify();

private:
    std::mutex;
    std::weak_ptr<Subject> subject;
};

With this structure, we create a cyclic graph, but with a judicious use of weak_ptr so that both Observer and Subject can be destroyed without coordination.

Note: I have assumed, for simplicity, that an Observer observes a single Subject at a time, but it could easily observe multiple subjects.


Now, it seems that you are stuck with unsafe memory management. This is a quite difficult situation, as you can imagine. In this case, I would suggest an experiment: an asynchronous Unsubscribe. Or at least, the call to Unsubscribe will be synchronous from the outside, but be implemented asynchronously.

The idea is simple: we will use the event queue to achieve synchronization. That is:

  • the call to Unsubscribe posts an event in the queue (payload Observer*) and then waits
  • when the Subject thread has processed the Unsubscribe event(s), it wakes up the waiting thread(s)

You can use either busy-waiting or a condition variable, I would advise a condition variable unless performance dictates otherwise.

Note: this solution completely fails to account for Subject dying prematurely.

Upvotes: 2

Sameer
Sameer

Reputation: 4389

Change observers to a map with key Observer* and value a wrapper of Observer. The wrapper includes a volatile boolean to indicate if the Observer is valid. In subscribe method, the wrapper object is created in valid state. In unsubscribe method, the wrapper is marked as invalid. Notify is called on the wrapper instead of the actual Observer. The wrapper will call Notify on the actual Observer if it is valid (still subscribed)

#include <map>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class ObserverWrapper : public Observer
{
public:
    Observer* wrappee;
    volatile bool valid;
    ObserverWrapper(Observer* o) 
    {
        wrappee = o;
        valid = true;
    }

    void Notify() 
    {
        if (valid) wrappee->Notify();
    }
}
class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
        observers.insert(pair<Observer*, sptr));
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.find(o)->second->valid = false;
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            vector<ObserverWrapper*> notifyList;
            {
                mutex::scoped_lock l(m);
                boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
            }
            // Should be no problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&ObserverWrapper::Notify));
        }
    }

private:
    map<Observer*, ObserverWrapper*> observers;
    thread t;
    mutex m;
};

Upvotes: 0

&#201;ric Malenfant
&#201;ric Malenfant

Reputation: 14148

Can you change the signature of Subscribe() an Unsubscribe()? Replacing the Observer* with something like shared_ptr<Observer> would make things easier.

EDIT: Replaced "easy" by "easier" above. For an example of how this is difficult to "get right", see the history of the Boost.Signals and of the adopted-but-not-yet-in-the-distribution Boost.Signals2 (formerly Boost.ThreadSafeSignals) libraries.

Upvotes: 3

Patrick
Patrick

Reputation: 8318

I think this does the trick if not very elegantly:

class Subject {
public:
Subject() : t(bind(&Subject::Run, this)),m_key(0)    {    }
void Subscribe(Observer* o) {
    mutex::scoped_lock l(m);
    InternalObserver io( o );
    boost::shared_ptr<InternalObserver> sp(&io);
    observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
}

void Unsubscribe(Observer* o) {
    mutex::scoped_lock l(m);
    observers.find( MakeKey(o) )->second->exists = false;    }

void WaitForSomethingInterestingToHappen() {}
void Run()
{
    for (;;)
    {
        WaitForSomethingInterestingToHappen();
        for( unsigned int i = 0; i < observers.size(); ++ i )
        {
            mutex::scoped_lock l(m);
            if( observers[i]->exists )
            {
                mem_fun(&Observer::Notify);//needs changing
            }
            else
            {
                observers.erase(i);
                --i;
            }
        }
    }
}
private:

int MakeKey(Observer* o) {
    return ++m_key;//needs changeing, sha of the object?
}
class InternalObserver {
public:
    InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
    Observer* m_o;
    bool exists;
};

map< int, boost::shared_ptr<InternalObserver> > observers;
thread t;
mutex m;
int m_key;
};

Upvotes: 0

Rob K
Rob K

Reputation: 8926

Unsubscribe() should be synchronous, so that it does not return until Observer is guaranteed not to be in Subject's list anymore. That's the only way to do it safely.

ETA (moving my comment to the answer):

Since time doesn't seem to be an issue, take and release the mutex between notifying each observer. You won't be able to use for_each the way you are now, and you'll have to check the iterator to ensure that it's still valid.

for ( ... )
{
    take mutex
    check iterator validity
    notify
    release mutex
}

That will do what you want.

Upvotes: 7

Greg Rogers
Greg Rogers

Reputation: 36459

Would something like this be satisfactory? It still isn't safe to unsubscribe an observer while being notified though, for that you would need an interface like you mentioned (as far as I can tell).

Subscribe(Observer *x)
{
    mutex.lock();
    // add x to the list
    mutex.unlock();
}

Unsubscribe(Observer *x)
{
    mutex.lock();
    while (!ok_to_delete)
        cond.wait(mutex);
    // remove x from list
    mutex.unlock();
}

NotifyLoop()
{
    while (true) {
        // wait for something to trigger a notify

        mutex.lock();
        ok_to_delete = false;
        // build a list of observers to notify
        mutex.unlock();

        // notify all observers from the list saved earlier

        mutex.lock();
        ok_to_delete = true;
        cond.notify_all();
        mutex.unlock();
    }
}

If you want to be able to Unsubscribe() inside Notify() - (a bad design decision on the client IMO...) you can add the thread id of the notifier thread into your data structure. In the Unsubscribe function you can check that thread id against the current thread's id (most threading libraries provide this - eg. pthread_self). If they are the same, you can proceed without waiting on the condition variable.

NOTE: If the client is responsible for deleting the observer, this means you run into the situation where inside the Notify callback, you will have unsubscribed and deleted the observer, but are still executing something with that junked this pointer. It is something the client will have to be aware of and to only delete it at the end of the Notify().

Upvotes: 1

Diego Sevilla
Diego Sevilla

Reputation: 29021

Mmm... I don't really understand your question, because if a client calls Unsubscribe you should be able to let the client delete it (it's not used by you). However, if for some reason you cannot close the relationship once the client unsubscribes the observer, you could add "Subject" a new operation to safely delete an Observer, or just for the clients to signal that they are not interested in an Observer any more.

Rethink edit: OK, now I think I understand what's your problem. I think the best solution to your problem is doing the following:

  1. Have each stored observer element to have a "valid" flag. This flag will be used to notify it or not while you're in the notification loop.
  2. You need a mutex to protect the access to that "valid" flag. Then, the unsubscribe operation locks the mutex for the "valid" flag, sets it to false for the selected observer.
  3. The notification loop also has to lock and unlock the mutex of the valid flag, and only act upon observers that are "valid".

Given that the unsubscribe operation will block on the mutex to reset the valid flag (and that that particular Observer won't be used any more in your thread), the code is thread safe, and clients can delete any observer as soon as unsubscribe has returned.

Upvotes: 1

m-sharp
m-sharp

Reputation: 17135

You could create a "to-delete queue" in the CSubject type. When you remove the the Observer, you could call pSubject->QueueForDelete(pObserver). Then when the subject thread is between notifications, it could safely delete observers from the queue.

Upvotes: 1

anon
anon

Reputation:

Rather than have clients get a "SafeToDelete" notification, provide them with an IsSubscribed( Observer *) method. The client code then becomes:

subject.Unsubscribe( obsever );l
while( subject.IsSubscribed( observer ) ) {
   sleep_some_short_time;   // OS specific sleep stuff
}
delete observer;

which is not too onerous.

Upvotes: 1

Related Questions