jmasterx
jmasterx

Reputation: 54123

Safe Message Queue with multiple threads

Here is what I essentially have:

I have thread A that periodically checks for messages and processes them.

Threads B and C need to send messages to A.

The problem arises when B and C or B or C try to send a message to A while A is processing a message and thus accessing the queue.

How is this problem usually solved?

Thanks

Upvotes: 6

Views: 7571

Answers (2)

Muthu
Muthu

Reputation: 2685

If you are not on windows or if you are implementing something which is cross platform in C++, try using the Queue from ACE libraries.

ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue;

As a sample from ACE library samples, You can then use For putting message to the queue:

  ACE_NEW_RETURN (mb,
              ACE_Message_Block (rb.size (),
              ACE_Message_Block::MB_DATA,
              0,
              buffer),
              0);
  mb->msg_priority (ACE_Utils::truncate_cast<unsigned long> (rb.size ()));
  mb->wr_ptr (rb.size ());

  ACE_DEBUG ((LM_DEBUG,
          "enqueueing message of size %d\n",
          mb->msg_priority ()));

 // Enqueue in priority order.
 if (msg_queue->enqueue_prio (mb) == -1)
ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));

for getting from queue:

 ACE_Message_Block *mb = 0;

 msg_queue->dequeue_head (mb) == -1;
 int length = ACE_Utils::truncate_cast<int> (mb->length ());

 if (length > 0)
   ACE_OS::puts (mb->rd_ptr ());

  // Free up the buffer memory and the Message_Block.
  ACE_Allocator::instance ()->free (mb->rd_ptr ());
  mb->release ();

The advantage is you can change the synchronization primitive very easily without having to write too much of code.

Upvotes: 0

Nate
Nate

Reputation: 12829

This is normally solved using mutexes, or other multi-thread protection mechanisms.

If you are working on windows, MFC provides the CMutex class for this problem.

If you are working on a posix system, the posix api provides the pthread_mutex_lock, pthread_mutex_unlock, and pthread_mutex_trylock functions.

Some basic pseudocode would be handy to demonstrate their use in your case:

pthread_mutex_t mutex; *or* CMutex mutex;
Q queue;  // <-- both mutex and queue are global state, whether they are
          //     global variables, or passed in as parameters, they must
          //     be the shared by all threads.

int threadA(/* params */){
    while( threadAStillRunning ){
        // perform some non-critical actions ...
        pthread_mutex_lock(mutex) *or* mutex.Lock()
        // perform critical actions ...
        msg = queue.receiveMessage()
        pthread_mutex_unlock(mutex) *or* mutex.Unlock()
        // perform more non-critical actions
    }
}

int threadBorC(/* params */){
    while( theadBorCStillRunning ){
        // perform some non-critical actions ...
        pthread_mutex_lock(mutex) *or* mutex.Lock()
        // perform critical actions ...
        queue.sendMessage(a_msg)
        pthread_mutex_unlock(mutex) *or* mutex.Unlock()
    }
}

For all three threads, their ability to act on the queue hinges on their ability to acquire the mutex - they will simply block and wait until the mutex is acquired. This prevents conflicts arising from the use of that resource.

Upvotes: 4

Related Questions