xlw12
xlw12

Reputation: 781

Queued thread notification

That you can imagine my problem i describe the usage of my design:

In the class SerialInterface there is a thread that is checking every 10ms if a message is received. The class is implemented as an Observer pattern to notify other classes about the new received message/byte.

The Notify method of the Observer pattern is blocking until every subject has done its operation. Because i want to avoid any lags, I would like to notify the subjects asynchronously.

My first thought were events (condition variables in C++11).

The implementation would look like this:

class SerialInterface: public Observer {
private:
  .....
    void NotifyThread() {
        while (mRunThreadNotify) {
            std::unique_lock<std::mutex> lock(mMutex);
            mCv.wait(lock);

            NotifyObservers();
        }
    }

    std::mutex              mMutex;
    std::condition_variable mCv;
    std::atomic_bool        mRunThreadNotify;
    std::thread             mThreadNotify;
  .....
};

Now i can notify asynchronously via mCv.notify_all();

The problem now is following:

What if the thread NotifyThread() is currently notifying the subjects, but theres a new notify event incoming at the same time. It would complete the current notification and the new state would be skipped.

So my second approach was to create a counter for notifications and let it act like a queue:

class SerialInterface: public Observer {
public:
  ....
private:
  .....
    void NotifyThread() {
        while (mRunThreadNotify) {
            if (mNotifications > 0) {
                NotifyObservers();
                mNotifications--;
            } else {
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
            }
        }
    }

    std::atomic<size_t> mNotifications;
    std::atomic_bool    mRunThreadNotify;
    std::thread         mThreadNotify;
  .....
};

Here i have to increase the variable mNotifications to notify the subjects. But for me this solution looks not perfect as i use std::this_thread::sleep_for for a fixed waiting time.

Are there any suggestions or another approaches for this problem?

Upvotes: 0

Views: 570

Answers (2)

911
911

Reputation: 928

When a notification is received, you can check whether your requirements have been met at that time.

Meeting the requirement can be specified as a predicate in the second argument to the wait().

mCvNotifications.wait(lock, [](){return true_if_requirements_met;}); 

If the requirement has not been met, thread will stay in the wait stage despite the notification.

Upvotes: 1

Jeremy Friesner
Jeremy Friesner

Reputation: 73081

It seems to me that you want to separate the real-time behavior (10mS serial poll) from the rest of the program so that the real-time thread will never be held off waiting for any other routines. Given that, my suggestion would be to split the pattern into two parts:

  1. The real-time part, which does nothing but receive incoming serial data and append it to the end of a FIFO queue (in a thread-safe manner, of course).

  2. The non-real-time part (running in a different thread), in which data is popped from the head of the FIFO queue and handed around to all of the software components that want to react to it. This part can be as fast or as slow as it likes, since it will not hold up the real-time thread.

The FIFO queue part is a standard producer-consumer problem; there are various ways to implement it, but the way I usually do it is with a dequeue, a lock, and a condition variable (pseudocode):

// Called by the real-time/serial thread when it received serial data
void AppendBytesToQueue(const TheBytesObject & bytes)
{
   bool wasQueueEmptyBefore;
   m_lock.lock();
   wasQueueEmptyBefore = (m_fifo.size() == 0);
   m_fifo.push_back(bytes);
   m_lock.unlock();
   if (wasQueueEmptyBefore) m_condition_variable.signal();
}

// Called by the non-real-time/handling thread after it was 
// woken up by the condition variable's signal (outQueue should
// be a reference to an empty dequeue that gets filled by this
// method)
void GetNewBytesFromQueue(std::dequeue & outQueue)
{
   m_lock.lock();
   std::swap(m_fifo, outQueue);  // fast O(1) operation so m_lock() will never be locked for long
   m_lock.unlock();
}

... and then after calling GetNewBytesFromQueue(), the handling/non-real-time thread can iterate over the contents of its temporary dequeue and deal with each item in order, without any risk of affecting the serial thread's performance.

Upvotes: 2

Related Questions