Reputation: 13806
I have a Subject which offers Subscribe(Observer*)
and Unsubscribe(Observer*)
to clients. Subject runs in its own thread (from which it calls Notify()
on subscribed Observers) and a mutex protects its internal list of Observers.
I would like client code - which I don't control - to be able to safely delete an Observer after it is unsubscribed. How can this be achieved?
Edit
Some illustrative code follows. The problem is how to prevent Unsubscribe happening while Run is at the 'Problem here' comment. Then I could call back on a deleted object. Alternatively, if I hold the mutex throughout rather than making the copy, I can deadlock certain clients.
#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
using namespace std;
using namespace boost;
class Observer
{
public:
void Notify() {}
};
class Subject
{
public:
Subject() : t(bind(&Subject::Run, this))
{
}
void Subscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.insert(o);
}
void Unsubscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.erase(o);
}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
set<Observer*> notifyList;
{
mutex::scoped_lock l(m);
notifyList = observers;
}
// Problem here
for_each(notifyList.begin(), notifyList.end(),
mem_fun(&Observer::Notify));
}
}
private:
set<Observer*> observers;
thread t;
mutex m;
};
Edit
I can't Notify observers while holding the mutex because of the deadlock risk. The most obvious way this can happen - the client calls Subscribe or Unsubscribe from inside Notify - is easily remedied by making the mutex recursive. More insidious is the risk of intermittent deadlock on different threads.
I'm in a multithreaded environment, so at any point in a thread's execution, it will typically hold a sequence of locks L1, L2, ... Ln. Another thread will hold locks K1, K2, ... Km. A properly written client will ensure that different threads will always acquire locks in the same order. But when clients interact with my Subject's mutex - call it X - this strategy will be broken: Calls to Subscribe / Unsubscribe acquire locks in the order L1, L2, ... Ln, X. Calls to Notify from my Subject thread acquire locks in the order X, K1, K2, ... Km. If any of the Li or Kj can coincide down any call path, the client suffers an intermittent deadlock, with little prospect of debugging it. Since I don't control the client code, I can't do this.
Upvotes: 11
Views: 2670
Reputation: 300199
The "ideal" solution would involve using shared_ptr
and weak_ptr
. However, in order to be generic, it also has to account for the issue of Subject
being dropped before some of its Observer
(yes, that can happen too).
class Subject {
public:
void Subscribe(std::weak_ptr<Observer> o);
void Unsubscribe(std::weak_ptr<Observer> o);
private:
std::mutex mutex;
std::set< std::weak_ptr<Observer> > observers;
};
class Observer: boost::noncopyable {
public:
~Observer();
void Notify();
private:
std::mutex;
std::weak_ptr<Subject> subject;
};
With this structure, we create a cyclic graph, but with a judicious use of weak_ptr
so that both Observer
and Subject
can be destroyed without coordination.
Note: I have assumed, for simplicity, that an Observer
observes a single Subject
at a time, but it could easily observe multiple subjects.
Now, it seems that you are stuck with unsafe memory management. This is a quite difficult situation, as you can imagine. In this case, I would suggest an experiment: an asynchronous Unsubscribe
. Or at least, the call to Unsubscribe
will be synchronous from the outside, but be implemented asynchronously.
The idea is simple: we will use the event queue to achieve synchronization. That is:
Unsubscribe
posts an event in the queue (payload Observer*
) and then waitsSubject
thread has processed the Unsubscribe
event(s), it wakes up the waiting thread(s)You can use either busy-waiting or a condition variable, I would advise a condition variable unless performance dictates otherwise.
Note: this solution completely fails to account for Subject
dying prematurely.
Upvotes: 2
Reputation: 4389
Change observers
to a map
with key Observer*
and value a wrapper of Observer
. The wrapper includes a volatile
boolean to indicate if the Observer
is valid. In subscribe
method, the wrapper object is created in valid state. In unsubscribe
method, the wrapper is marked as invalid. Notify
is called on the wrapper instead of the actual Observer. The wrapper will call Notify
on the actual Observer if it is valid (still subscribed)
#include <map>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
using namespace std;
using namespace boost;
class Observer
{
public:
void Notify() {}
};
class ObserverWrapper : public Observer
{
public:
Observer* wrappee;
volatile bool valid;
ObserverWrapper(Observer* o)
{
wrappee = o;
valid = true;
}
void Notify()
{
if (valid) wrappee->Notify();
}
}
class Subject
{
public:
Subject() : t(bind(&Subject::Run, this))
{
}
void Subscribe(Observer* o)
{
mutex::scoped_lock l(m);
boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
observers.insert(pair<Observer*, sptr));
}
void Unsubscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.find(o)->second->valid = false;
observers.erase(o);
}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
vector<ObserverWrapper*> notifyList;
{
mutex::scoped_lock l(m);
boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
}
// Should be no problem here
for_each(notifyList.begin(), notifyList.end(),
mem_fun(&ObserverWrapper::Notify));
}
}
private:
map<Observer*, ObserverWrapper*> observers;
thread t;
mutex m;
};
Upvotes: 0
Reputation: 14148
Can you change the signature of Subscribe() an Unsubscribe()? Replacing the Observer* with something like shared_ptr<Observer> would make things easier.
EDIT: Replaced "easy" by "easier" above. For an example of how this is difficult to "get right", see the history of the Boost.Signals and of the adopted-but-not-yet-in-the-distribution Boost.Signals2 (formerly Boost.ThreadSafeSignals) libraries.
Upvotes: 3
Reputation: 8318
I think this does the trick if not very elegantly:
class Subject {
public:
Subject() : t(bind(&Subject::Run, this)),m_key(0) { }
void Subscribe(Observer* o) {
mutex::scoped_lock l(m);
InternalObserver io( o );
boost::shared_ptr<InternalObserver> sp(&io);
observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
}
void Unsubscribe(Observer* o) {
mutex::scoped_lock l(m);
observers.find( MakeKey(o) )->second->exists = false; }
void WaitForSomethingInterestingToHappen() {}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
for( unsigned int i = 0; i < observers.size(); ++ i )
{
mutex::scoped_lock l(m);
if( observers[i]->exists )
{
mem_fun(&Observer::Notify);//needs changing
}
else
{
observers.erase(i);
--i;
}
}
}
}
private:
int MakeKey(Observer* o) {
return ++m_key;//needs changeing, sha of the object?
}
class InternalObserver {
public:
InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
Observer* m_o;
bool exists;
};
map< int, boost::shared_ptr<InternalObserver> > observers;
thread t;
mutex m;
int m_key;
};
Upvotes: 0
Reputation: 8926
Unsubscribe() should be synchronous, so that it does not return until Observer is guaranteed not to be in Subject's list anymore. That's the only way to do it safely.
ETA (moving my comment to the answer):
Since time doesn't seem to be an issue, take and release the mutex between notifying each observer. You won't be able to use for_each the way you are now, and you'll have to check the iterator to ensure that it's still valid.
for ( ... )
{
take mutex
check iterator validity
notify
release mutex
}
That will do what you want.
Upvotes: 7
Reputation: 36459
Would something like this be satisfactory? It still isn't safe to unsubscribe an observer while being notified though, for that you would need an interface like you mentioned (as far as I can tell).
Subscribe(Observer *x)
{
mutex.lock();
// add x to the list
mutex.unlock();
}
Unsubscribe(Observer *x)
{
mutex.lock();
while (!ok_to_delete)
cond.wait(mutex);
// remove x from list
mutex.unlock();
}
NotifyLoop()
{
while (true) {
// wait for something to trigger a notify
mutex.lock();
ok_to_delete = false;
// build a list of observers to notify
mutex.unlock();
// notify all observers from the list saved earlier
mutex.lock();
ok_to_delete = true;
cond.notify_all();
mutex.unlock();
}
}
If you want to be able to Unsubscribe() inside Notify() - (a bad design decision on the client IMO...) you can add the thread id of the notifier thread into your data structure. In the Unsubscribe function you can check that thread id against the current thread's id (most threading libraries provide this - eg. pthread_self). If they are the same, you can proceed without waiting on the condition variable.
NOTE: If the client is responsible for deleting the observer, this means you run into the situation where inside the Notify callback, you will have unsubscribed and deleted the observer, but are still executing something with that junked this pointer. It is something the client will have to be aware of and to only delete it at the end of the Notify().
Upvotes: 1
Reputation: 29021
Mmm... I don't really understand your question, because if a client calls Unsubscribe you should be able to let the client delete it (it's not used by you). However, if for some reason you cannot close the relationship once the client unsubscribes the observer, you could add "Subject" a new operation to safely delete an Observer, or just for the clients to signal that they are not interested in an Observer any more.
Rethink edit: OK, now I think I understand what's your problem. I think the best solution to your problem is doing the following:
Given that the unsubscribe operation will block on the mutex to reset the valid flag (and that that particular Observer won't be used any more in your thread), the code is thread safe, and clients can delete any observer as soon as unsubscribe has returned.
Upvotes: 1
Reputation: 17135
You could create a "to-delete queue" in the CSubject type. When you remove the the Observer, you could call pSubject->QueueForDelete(pObserver). Then when the subject thread is between notifications, it could safely delete observers from the queue.
Upvotes: 1
Reputation:
Rather than have clients get a "SafeToDelete" notification, provide them with an IsSubscribed( Observer *) method. The client code then becomes:
subject.Unsubscribe( obsever );l
while( subject.IsSubscribed( observer ) ) {
sleep_some_short_time; // OS specific sleep stuff
}
delete observer;
which is not too onerous.
Upvotes: 1