Reputation: 59
I am new to the Azure and learning the could & service bus for past couple of weeks.
I am trying to read the messages from a Azure Service Bus Queue. The QueueClient receive method will pull the message out of the queue which I do not want to do initially. So I am browsing through all the messages in the queue one at a time in a while loop by using Peek() method. Then I compare the correlationId of the message with the Correlation Id i internally maintain in the local DB table.
Only if the correlation ID matches, then I go ahead and receive the message. But, Inorder to receive the message using the messageSequencenumber, I learned that I have to defer the message first, get the messageid stored in a list or something and then use the QueueClient Receive() method to receive the messages and finally mark the message as complete.
But Since I am browsing the messages using Peek(), this would not allow me to defer the message. I am stuck here in receiving the message using messageId.
Also I cannot just complete a message before receiving.
Can you please suggest any ways to accomplish this?
BrokeredMessage message = new BrokeredMessage();
message = null;
while ((message = reader.Peek()) != null && row_count > 0)
{
List<long> deferredMessageReceipts = new List<long>();
// Read Ping results table to get the rows with no msg_recv_ts
logobj.Categories.Clear();
logobj.Categories.Add("INFO");
logobj.Message = "Reading ! Message: " + " Correlation ID:" + message.CorrelationId;
Logger.Write(logobj);
if (message != null)
{
if (PRTA_rows.Corr_id == message.CorrelationId) //compare correlation ids
{
DateTime ping_recv_ts = DateTime.Now;
logobj.Categories.Clear();
logobj.Categories.Add("INFO");
string messageBody = message.GetBody<string>();
logobj.Message = "Ack Message Found ! Message Body: " + messageBody + " Correlation ID:" + message.CorrelationId;
Logger.Write(logobj);
string msg_type = "PING_ACK";
logobj.Categories.Clear();
logobj.Categories.Add("INFO");
logobj.Message = "Marking Message as complete...";
Logger.Write(logobj);
// Deferring a message
message.Defer(); // Getting error here "The operation cannot be completed because the ReceiveContext is null."
long msg_seq_nbr=message.SequenceNumber;
reader.Receive(msg_seq_nbr); // This operation is not possible without deferring the message.
message.Complete();
}
}
} // End while browsing messages.
Upvotes: 1
Views: 1835
Reputation: 1651
I think you should reconsider your design as this is not how you would typically use a queuing system. The are used when you need temporal decoupling, load balancing and are for first in first out.
You may want to consider using Servicebus Topics and have Subscribers per application. Topics are similar to queues but use a Pub\Sub model. You could use filters in the Topic\Subscription to route the messages to the correct Subscription. All apps would 'publish' to the one Topic and you would have a Subscription for each application.
Upvotes: 1