Stix
Stix

Reputation: 334

MassTransit Azure Service Bus - Consuming faults using Topics/Subscriptions

I am currently testing out MassTransit exception handling, and am struggling to get Fault messages published. Ideally I'd like a Subscription consumer to retry and then if a fault is thrown, have another consumer pick this up, so far I have a simple setup:

  1. A test consumer with retry definition:
public class TestConsumer : IConsumer<MyEvent>
{
    public async Task Consume(ConsumeContext<MyEvent> context)
    {
        throw new Exception("Whoops");
    }
}

public class TestConsumerDefinition : ConsumerDefinition<TestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TestConsumer> consumerConfigurator)
    {
        base.ConfigureConsumer(endpointConfigurator, consumerConfigurator);
        consumerConfigurator.UseMessageRetry(configureRetry => configureRetry.Interval(5, TimeSpan.FromMilliseconds(500)));
    }
  1. A fault consumer:
public class TestFaultConsumer : IConsumer<Fault<MyEvent>>
{
    public async Task Consume(ConsumeContext<Fault<MyEvent>> context)
    {
        return; // Handle something here
    }
}

Registered as follows:

services.AddMassTransit(busRegistrationConfigurator =>
        {
            busRegistrationConfigurator.AddConsumer<TestConsumer, TestConsumerDefinition>();
            busRegistrationConfigurator.AddConsumer<TestFaultConsumer>();

            busRegistrationConfigurator.UsingAzureServiceBus((busRegistrationContext, serviceBusFactoryConfigurator) =>
            {
                serviceBusFactoryConfigurator.Host(configuration["ServiceBusSettings:ConnectionString"]);

                serviceBusFactoryConfigurator.SubscriptionEndpoint("test-consumer", "my-topic", cfg =>
                {
                    cfg.ConfigureConsumer<TestConsumer>(busRegistrationContext);
                });
            });
        });

When the exception is thrown in the first consumer, there is no fault queue created in ASB. Is there a piece of setup I am missing to configure the error queue in ASB and have the Fault be consumed?

Also as a final aside, I'd also like to get this Fault consumption working within an Azure function, is this possible or is this not supported with ASB/Functions?

Any help would be great, thanks

Upvotes: 0

Views: 635

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33457

First, your consumer definition should be configuring retry on the endpoint, not the consumer:

public class TestConsumerDefinition : ConsumerDefinition<TestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TestConsumer> consumerConfigurator)
    {
        endpointConfigurator.UseMessageRetry(configureRetry => configureRetry.Interval(5, TimeSpan.FromMilliseconds(500)));
    }
}

Second, you aren't calling ConfigureEndpoints, so there won't be an endpoint created for the fault consumer as it isn't configured. You should configure it after the subscription endpoint is configured:

busRegistrationConfigurator.UsingAzureServiceBus((busRegistrationContext, serviceBusFactoryConfigurator) =>
{    serviceBusFactoryConfigurator.Host(configuration["ServiceBusSettings:ConnectionString"]);

    serviceBusFactoryConfigurator.SubscriptionEndpoint("test-consumer", "my-topic", cfg =>
    {
        cfg.ConfigureConsumer<TestConsumer>(busRegistrationContext);
    });

    serviceBusFactoryConfigurator.ConfigureEndpoints(busRegistrationContext);
});

As to the part about Azure Functions, that would require configuring everything in Azure Service Bus manually, since AF is the transport and requires everything to exist up front. There is a sample, that might give you some guidance.

I am not a fan of Azure Functions, as I find the deployment model painful, but it is supported by MassTransit.

Upvotes: 1

Related Questions