user606521
user606521

Reputation: 15434

How to configure some types/groups of consumers independently of others?

For now I was using following code to add all consumers and configure endpoints:

services.AddMassTransit(x =>
{
    x.AddConsumers(assemblyToLoadConsumersFrom);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(settings.ConnectionString));

        cfg.ConfigureEndpoints(context);
    });
});

However now I have different types of consumers defined by abstract classes like CommandResponseConsumerBase and QueryConsumerBase (those abstract classes implement IConsumer interface, and the actual consumer implementations inherit from those classes). Each consumer implementation requires some arguments in constructor to be injected by DI (using Microsoft DI and code above properly injects arguments to consumer constructors).

The problem is that for example for all consumers of type QueryConsumerBase I want to set endpointConfiguration.DiscardFaultedMessages() but keep the default (not discard) for other consumers (like CommandResponseConsumerBase). I want also keep default topology / endpoint names like ones assigned by code above.

Also I don't want to configure consumers separately because I have a lot of them and it would be tedious. Instead I want to just find all CommandResponseConsumerBase consumers in the assembly and register them and configure them all at once and then find all QueryConsumerBase in assembly and register them and configure them all at once.

Code will look something like this:

services.AddMassTransit(x =>
{
    // x.AddConsumers(assemblyToLoadConsumersFrom);

    assemblyToLoadConsumersFrom
        .GetAllImplementations(typeof(CommandResponseConsumerBase<,>)).ToList() // returns classes which implement CommandResponseConsumerBase<,>)
        .ForEach(impl =>
        {
            x.AddConsumer(impl.ImplementedType); // this works like x.AddConsumers(assemblyToLoadConsumersFrom);
            // but how to configure consumer here?
        });

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(settings.ConnectionString));

        cfg.ConfigureEndpoints(context);
    });
});

The x.AddConsumer(impl.ImplementedType) method allows me to add consumer, but I cant pass ConsumerTypeDefinition as I don't have any defined and I don't want to define one for each consumer. Also can't use x.AddConsumer<T>(...) because I have type of consumer in a dynamic variable. Also tried x.AddConsumer(impl.ImplementedType).Endpoint(e => e.?) but endpoint configurator allows only changing name, temporary setting, and few other settings but not something like DiscardFaultedMessages().

Using MassTransit / AspNetCore / RabbitMq 7.2.4


Final solution (thanks Chris):

// Actually this definition class is not even aware of exact 
// consumer type.

public class CommandResponseConsumerDefinition<TConsumer> : ConsumerDefinition<TConsumer>
    where TConsumer : class, IConsumer
{
    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<TConsumer> consumerConfigurator)
    {
        endpointConfigurator.DiscardFaultedMessages();
    }
}
assemblyToLoadConsumersFrom!
    .GetAllImplementations(typeof(CommandResponseConsumerBase<,>))
    .ToList()
    .ForEach(impl =>
    {
        // create definition type that is aware of exact consumer type
        var definitionType = typeof(CommandResponseConsumerDefinition<>)
            .MakeGenericType(impl.ImplementedType);

        // Add consumer type along with definition type
        x.AddConsumer(impl.ImplementedType, definitionType);
    });

Upvotes: 1

Views: 847

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33278

You might be able to create two generic consumer definitions, one for the query base, and one for the command base, register the implemented generic type of the definition along with the consumer (when calling AddConsumer).

Something like:

public class QueryConsumerDefinition<TConsumer> :
    ConsumerDefinition<TConsumer>
    where TConsumer : QueryConsumerBase
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
    {
        endpointConfigurator.DiscardFaultedMessages();
    }
}

Upvotes: 1

Related Questions