Pupsik
Pupsik

Reputation: 742

How to handle thread-safe callback registration and execution in C++?

For example I've an EventGenerator class that call IEventHandler::onEvent for all registered event handlers:

class IEventHandler {
public: virtual void onEvent(...) = 0;
};


class EventGenerator {
private: 
   std::vector<IEventHandler*> _handlers;
   std::mutex _mutex; // [1]
public:
   void AddHandler(IEventHandler* handler) {
      std::lock_guard<std::mutex> lck(_mutex); // [2]
      _handlers.push_back(handler);
   }
   void RemoveHanler(IEventHandler* handler) {
      std::lock_guard<std::mutex> lck(_mutex); // [3]
      // remove from "_handlers"
   }
private:
   void threadMainTask() {

      while(true) {

         // Do some work ...

         // Post event to all registered handlers
         {
            std::lock_guard<std::mutex> lck(_mutex); // [4]
            for(auto& h : _handlers) { h->onEvent(...); )
         }

         // Do some work ...

      }
    }

The code should be thread safe in the following manner:

To support this, I have the following synchonization (see comment in the code):

This code works until... If for some reason, during the execution of IEventHandler::onEvent(...) the code is trying to call EventManager::RemoveHandler or EventManager::AddHandler. The result is runtime exception.

What is the best approach to handle registration of the event handlers and executing the event handler callback in the thread safe manner?




>> UPDATE <<

So based on the inputs, I've updated to the following design:

class IEventHandler {
public: virtual void onEvent(...) = 0;
};

class EventDelegate {
private: 
   IEventHandler* _handler;
   std::atomic<bool> _cancelled;
public:
   EventDelegate(IEventHandler* h) : _handler(h), _cancelled(false) {};
   void Cancel() { _cancelled = true; }
   void Invoke(...) { if (!_cancelled) _handler->onEvent(...); }
}

class EventGenerator {
private: 
   std::vector<std::shared_ptr<EventDelegate>> _handlers;
   std::mutex _mutex;
public:
   void AddHandler(std::shared_ptr<EventDelegate> handler) {
      std::lock_guard<std::mutex> lck(_mutex);
      _handlers.push_back(handler);
   }
   void RemoveHanler(std::shared_ptr<EventDelegate> handler) {
      std::lock_guard<std::mutex> lck(_mutex);
      // remove from "_handlers"
   }
private:
   void threadMainTask() {

      while(true) {

         // Do some work ...

         std::vector<std::shared_ptr<EventDelegate>> handlers_copy;

         {
            std::lock_guard<std::mutex> lck(_mutex);
            handlers_copy = _handlers;
         }

         for(auto& h : handlers_copy) { h->Invoke(...); )

         // Do some work ...

      }
    }

As you can see, there is additional class EventDelegate that have two purposes:

  1. hold the event callback
  2. enable to cancel the callback

In the threadMainTask, I'm using a local copy of the std::vector<std::shared_ptr<EventDelegate>> and I'm releasing the lock before invoking the callbacks. This approach solves an issue when during the IEventHandler::onEvent(...) the EventGenerator::{AddHandler,RemoveHanler} is called.

Any thoughts about the new design?

Upvotes: 4

Views: 7340

Answers (2)

greywolf82
greywolf82

Reputation: 22173

I can't see a right thing here. From your update I can see a problem: you are not synchronizing the invoke method with callback removal. There's an atomic but it's not enough. Example: just after this line of code:

if (!_cancelled)

Another thread calls the remove method. What can happen is that the onEvent() is called anyway, even if the removed method has removed the callback from the list and returned the result, there's nothing to keep synchronized this execution flow. Same problem for the answer of @bobah.

Upvotes: 0

bobah
bobah

Reputation: 18864

Copy-on-Write vector implemented on atomic swap of shared_ptr's (in assumptions callback registration is occurring far less frequently than events the callbacks are notified about):

using callback_t = std::shared_ptr<std::function<void(event_t const&)> >;
using callbacks_t = std::shared_ptr<std::vector<callback_t> >;
callbacks_t callbacks_;
mutex_t mutex_; // a mutex of your choice

void register(callback_t cb)
{
    // the mutex is to serialize concurrent callbacks registrations
    // this is not always necessary, as depending on the application
    // architecture, single writer may be enforced by design
    scoped_lock lock(mutex_);

    auto callbacks = atomic_load(&callbacks_);

    auto new_callbacks = std::make_shared< std::vector<callback_t> >();
    new_callbacks->reserve(callbacks->size() + 1);
    *new_callbacks = callbacks;
    new_callbacks->push_back(std::move(cb));

    atomic_store(&callbacks_, new_callbacks);
}

void invoke(event_t const& evt)
{
    auto callbacks = atomic_load(&callbacks_);

    // many people wrap each callback invocation into a try-catch
    // and de-register on exception
    for(auto& cb: *callbacks) (*cb)(evt); 
}

Specifically on the subject of asynchronous behavior when callback is executed while being de-registered, well here the best approach to take is remember of the Separation of Concerns principle.

The callback should not be able to die until it has been executed. This is achieved via another classic trick called "extra level of indirection". Namely, instead of registering user provided callback one would wrap it to something like the below and callback de-registration apart from updating the vector will call the below defined discharge() method on the callback wrapper and will even notify the caller of de-registration method of whether the callback execution finished successfully.

template <class CB> struct cb_wrapper
{
    mutable std::atomic<bool> done_;
    CB cb_;
    cb_wrapper(CB&& cb): cb(std::move(cb_)) {}

    bool discharge()
    {
        bool not_done = false;
        return done_.compare_exchange_strong(not_done, true);
    }

    void operator()(event_t const&)
    {
        if (discharge())
        {
            cb();
        }
    }

};

Upvotes: 4

Related Questions