OaicStef
OaicStef

Reputation: 187

Azure service bus - Session is duplicating messages in delivery to multiple instances

I working on a system in which I need to manage ordered messages throw a service bus. In my company we have everything on Azure cloud and I'm using Azure service bus for messaging. I read about the session and it seems that it could solve my challenge out of the box with Azure service bus but it is working only with a single instance. Once I scale out the we job to multiple instances, more than one instance is consuming the same messages from the same session. In my POC I using a Topic with a subscription.

Is someone able to make it work with scaling to multiple instances of the receiver? The second option I have is to implement by myself the Resequencer enterprise pattern, do you have any suggestions?

Here is the code I'm using to receive the messages:

static async Task InitializeReceiver(string connectionString, string queueName, CancellationToken ct)
    {
        var receiverFactory = MessagingFactory.CreateFromConnectionString(connectionString);

        ct.Register(() => receiverFactory.Close());

        var client = receiverFactory.CreateSubscriptionClient(queueName, "parallel", ReceiveMode.ReceiveAndDelete);

        client.RegisterSessionHandler(
            typeof(SessionHandler),
            new SessionHandlerOptions
            {
                AutoRenewTimeout = TimeSpan.FromMinutes(5),
                MessageWaitTimeout = TimeSpan.FromSeconds(120),
                MaxConcurrentSessions = 100,
                AutoComplete = false
            });
    }

Here is the code I use for processing:

public async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
    {
      await ProcessMessage(session, message, currentInstanceId, recipeStep);
      // If I process the last message of the session
      if (message.Sequence == numberOfMsgPerSession)
        {
            // end of the session!
            await session.CloseAsync();
            logger.LogInformation($"Session with id {message.SessionId} is completed.");

        }

}

Thanks. Regards.

Upvotes: 0

Views: 1125

Answers (2)

OaicStef
OaicStef

Reputation: 187

I solved it. If you see the code I posted in the question, for each session I close it when I'm processing the last message based on its sequence number. Once, I removed that piece of code, everything started working properly, I send 200 messages per session with 10 sessions. The messages are processed ordered for each session, and no more than one instance take a session as I was expecting.

Thank you all. OaicStef

Upvotes: 0

Arunprabhu
Arunprabhu

Reputation: 1494

I used timer to trigger my methods in Web jobs. When it comes to scaling, the number of timers got increased proportional to the number of instances. There, I replaced the timers with Azure Service Bus Topic Subscriptions. As the messages can be received only once, the methods got triggered only once even there are more than one running instances.

I had an active receiver listening for the messages, check here for creating a active listener.

As you were saying that the messages are received in multiple instances, you may have peeked the messages, instead of receiving it. Peeking the message do not delete it form the Subscription. Receiving it only will delete it from the Subscription, resulting in making the message not available for other receivers.

Upvotes: 1

Related Questions