Reputation: 54123
Here is what I essentially have:
I have thread A that periodically checks for messages and processes them.
Threads B and C need to send messages to A.
The problem arises when B and C or B or C try to send a message to A while A is processing a message and thus accessing the queue.
How is this problem usually solved?
Thanks
Upvotes: 6
Views: 7571
Reputation: 2685
If you are not on windows or if you are implementing something which is cross platform in C++, try using the Queue from ACE libraries.
ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue;
As a sample from ACE library samples, You can then use For putting message to the queue:
ACE_NEW_RETURN (mb,
ACE_Message_Block (rb.size (),
ACE_Message_Block::MB_DATA,
0,
buffer),
0);
mb->msg_priority (ACE_Utils::truncate_cast<unsigned long> (rb.size ()));
mb->wr_ptr (rb.size ());
ACE_DEBUG ((LM_DEBUG,
"enqueueing message of size %d\n",
mb->msg_priority ()));
// Enqueue in priority order.
if (msg_queue->enqueue_prio (mb) == -1)
ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
for getting from queue:
ACE_Message_Block *mb = 0;
msg_queue->dequeue_head (mb) == -1;
int length = ACE_Utils::truncate_cast<int> (mb->length ());
if (length > 0)
ACE_OS::puts (mb->rd_ptr ());
// Free up the buffer memory and the Message_Block.
ACE_Allocator::instance ()->free (mb->rd_ptr ());
mb->release ();
The advantage is you can change the synchronization primitive very easily without having to write too much of code.
Upvotes: 0
Reputation: 12829
This is normally solved using mutexes, or other multi-thread protection mechanisms.
If you are working on windows, MFC provides the CMutex class for this problem.
If you are working on a posix system, the posix api provides the pthread_mutex_lock
, pthread_mutex_unlock
, and pthread_mutex_trylock
functions.
Some basic pseudocode would be handy to demonstrate their use in your case:
pthread_mutex_t mutex; *or* CMutex mutex;
Q queue; // <-- both mutex and queue are global state, whether they are
// global variables, or passed in as parameters, they must
// be the shared by all threads.
int threadA(/* params */){
while( threadAStillRunning ){
// perform some non-critical actions ...
pthread_mutex_lock(mutex) *or* mutex.Lock()
// perform critical actions ...
msg = queue.receiveMessage()
pthread_mutex_unlock(mutex) *or* mutex.Unlock()
// perform more non-critical actions
}
}
int threadBorC(/* params */){
while( theadBorCStillRunning ){
// perform some non-critical actions ...
pthread_mutex_lock(mutex) *or* mutex.Lock()
// perform critical actions ...
queue.sendMessage(a_msg)
pthread_mutex_unlock(mutex) *or* mutex.Unlock()
}
}
For all three threads, their ability to act on the queue hinges on their ability to acquire the mutex - they will simply block and wait until the mutex is acquired. This prevents conflicts arising from the use of that resource.
Upvotes: 4