Joel
Joel

Reputation: 8978

MessageLockLostException on unhandled exception in combination with ConfigureDeadLetterQueueErrorTransport for MassTransit on Azure Service Bus

I'm trying to set MassTransit up with Azure Service Bus. I want to use the DLQ instead of relying on _skipped and _error queues, but I'm not sure if I'm doing it wrong.

This is a simplified version of my setup:

services.AddMassTransit(busConfigurator =>
{
    busConfigurator.AddConsumer<MessageConsumer>();
    busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
    {
        serviceBusBusFactoryConfigurator.Host(connectionString);

        serviceBusBusFactoryConfigurator.SubscriptionEndpoint<Message>(
            "message-subscription",
            configurator =>
            {
                configurator.ConfigureDeadLetterQueueErrorTransport();
                configurator.ConfigureDeadLetterQueueDeadLetterTransport();

                configurator.ConfigureConsumer<MessageConsumer>(context);
            });
        
    });
});

services.AddMassTransitHostedService(true);


---


public class Message
{
    public string Text { get; set; }
}

public class MessageConsumer : IConsumer<Message>
{
    public Task Consume(ConsumeContext<Message> context)
    {
        throw new Exception("Hello");
    }
}

When publishing a message ( await _bus.Publish<Message>(new {})), I'm getting something like this in the logs:

[15:19:22 | DBG | MassTransit] Create send transport: sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message--
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- ()
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault ()
[15:19:22 | DBG | MassTransit] Subscription Fault-MassTransit (MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- -> sb://mytestbus.servicebus.windows.net/MassTransit/Fault)
[15:19:22 | DBG | MassTransit] SEND sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- 09c40000-5dcc-0015-85cf-08d9988339b8 MassTransit.Fault<Acme.Common.Messaging.Contracts.Events.Message>
[15:19:22 | ERR | MassTransit] R-FAULT sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription 09c40000-5dcc-0015-5c52-08d998833786 Acme.Common.Messaging.Contracts.Events.Message Acme.Services.UpdateService.MessageConsumer(00:00:03.0423556)
System.Exception: Hello
   at Acme.Services.UpdateService.MessageConsumer.Consume(ConsumeContext`1 context) in C:\Development\acme\src\Acme.Services.UpdateService\Startup.cs:line 503
   at MassTransit.Pipeline.Filters.MethodConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumerConsumeContext<TConsumer,TMessage>>.Send(ConsumerConsumeContext`2 context, IPipe`1 next)
   at GreenPipes.Pipes.LastPipe`1.Send(TContext context)
   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
[15:19:23 | WRN | MassTransit] Message Lock Lost: 09c400005dcc00155c5208d998833786
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
   at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
[15:19:23 | WRN | MassTransit] Exception on Receiver sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription during UserCallback ActiveDispatchCount(0) ErrorRequiresRecycle(False)
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
   at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
   at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
   at MassTransit.Azure.ServiceBus.Core.Contexts.SubscriptionClientContext.<>c__DisplayClass16_0.<<OnMessageAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.Azure.ServiceBus.MessageReceivePump.MessageDispatchTask(Message message)

Is this by design or am I doing something wrong? I would have thought that the message would have been moved to the DLQ, without any logged warnings (except for the exception that I throw).

If I uncomment ConfigureDeadLetterQueueErrorTransport, then I no longer get the MessageLockLostException.

Using MassTransit 7.2.3

Upvotes: 1

Views: 713

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33457

With a subscription endpoint, the DLQ is the default behavior.

What you've configured is likely conflicting with the default configuration. If you get rid of the following lines, it should work as expected:

configurator.ConfigureDeadLetterQueueErrorTransport();
configurator.ConfigureDeadLetterQueueDeadLetterTransport();

You can see that those same lines are already configured for subscription endpoints.

Those lines are only needed to configure a receive endpoint to use the DLQ.

Upvotes: 4

Related Questions