Reputation: 848
I am currently having an issue of losing a message. This error occurs rarely, but happens often enough to be annoying. Here is the context of the issue:
Is it possible for the timeout and message receive to occur at the same time? Is there a better way for me to handle service stop checking to help avoid this error?
private void workerFunction()
{
logger.Info("Connecting to queue: " + Settings.Default.goldmine_service_queue);
MessageQueue q = new MessageQueue(Settings.Default.goldmine_service_queue);
q.Formatter = new ActiveXMessageFormatter();
while (serviceStarted)
{
Message currentMessage = null;
try
{
currentMessage = q.Peek(new TimeSpan(0,0,30));
}
catch (System.Messaging.MessageQueueException mqEx)
{
if (mqEx.ToString().Contains("Timeout for the requested operation has expired"))
{
logger.Info("Check for service stop request");
}
else
{
logger.Error("Exception while peeking into MSMQ: " + mqEx.ToString());
}
}
catch (Exception e)
{
logger.Error("Exception while peeking into MSMQ: " + e.ToString());
}
if (currentMessage != null)
{
logger.Info(currentMessage.Body.ToString());
try
{
ProcessMessage(currentMessage);
}
catch (Exception processMessageException)
{
logger.Error("Error in process message: " + processMessageException.ToString());
}
//Remove message from queue.
logger.Info("Message removed from queue.");
q.Receive();
//logPerformance(ref transCount, ref startTime);
}
}//end while
Thread.CurrentThread.Abort();
}
Upvotes: 6
Views: 8760
Reputation: 4687
Just some comments to clarify how MSMQ works here.
"I can prove that the message is being inserted into goldmine_service_queue since the message appears in the message journal.
The message goes into the Journal queue when the original message is removed from the goldmine_service_queue. So you can say that the message was successfully delivered to the queue AND successfully removed from the queue.
"I strongly suspect that my issue might relate to MessageQueue.Peek and time out behavior."
A Peek does nothing to remove the message from the queue. Only "q.Receive();" does that. In your code, there is no explicit connection between the message being peeked and the one being received. "q.Receive();" just says "receive message from top of the queue". In a multi-threaded environment, you could expect to have messages inconsistently read - some could be peeked and processed multiple times. You should be obtaining the ID of the Peeked message and using ReceiveByID so you can only receive the peeked message.
Upvotes: 3
Reputation: 1630
I am will to be that changing currentMessage = q.Peek(new TimeSpan(0,0,30));
to currentMessage = q.Receive();
will fix your problem. I've been using MSMQ for message passing in the exact same context but only use peek (and expect a timeout exception) to determine if they queue is empty. The call to Receive
is blocking though so plan accordingly.
Edit: also for your exception catching - you can check if your exception is a timeout exception by comparing the error code.
mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout
where mqe is your message queue exception. You might like that better than a string comparison.
Upvotes: 1
Reputation: 108975
I don't think any messages should be missed based on a quick review, but you are working in a very odd way with lots of scope for race conditions.
Why not just receive the message and pass it to ProcessMessage
(if ProcessMessage
fails, you are performing a read anyway). If you need to handle multiple receivers, then do the receive in an MSMQ transaction so the message is unavailable to other receivers but not removed from the queue until the transaction is committed.
Also, rather than polling the queue, why not do an asynchronous receive and let the thread pool handle the completion (where you must call EndReceive
). This saves tying up a thread, and you don't need to special case service shutdown (close the message queue and then call MessageQueue.ClearConnectionCache();
).
Also, aborting the thread is a really bad way to exit, just return from the thread's start function.
Upvotes: 5