Reputation: 15434
For now I was using following code to add all consumers and configure endpoints:
services.AddMassTransit(x =>
{
x.AddConsumers(assemblyToLoadConsumersFrom);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(settings.ConnectionString));
cfg.ConfigureEndpoints(context);
});
});
However now I have different types of consumers defined by abstract classes like CommandResponseConsumerBase
and QueryConsumerBase
(those abstract classes implement IConsumer
interface, and the actual consumer implementations inherit from those classes). Each consumer implementation requires some arguments in constructor to be injected by DI (using Microsoft DI and code above properly injects arguments to consumer constructors).
The problem is that for example for all consumers of type QueryConsumerBase
I want to set endpointConfiguration.DiscardFaultedMessages()
but keep the default (not discard) for other consumers (like CommandResponseConsumerBase
). I want also keep default topology / endpoint names like ones assigned by code above.
Also I don't want to configure consumers separately because I have a lot of them and it would be tedious. Instead I want to just find all CommandResponseConsumerBase
consumers in the assembly and register them and configure them all at once and then find all QueryConsumerBase
in assembly and register them and configure them all at once.
Code will look something like this:
services.AddMassTransit(x =>
{
// x.AddConsumers(assemblyToLoadConsumersFrom);
assemblyToLoadConsumersFrom
.GetAllImplementations(typeof(CommandResponseConsumerBase<,>)).ToList() // returns classes which implement CommandResponseConsumerBase<,>)
.ForEach(impl =>
{
x.AddConsumer(impl.ImplementedType); // this works like x.AddConsumers(assemblyToLoadConsumersFrom);
// but how to configure consumer here?
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(settings.ConnectionString));
cfg.ConfigureEndpoints(context);
});
});
The x.AddConsumer(impl.ImplementedType)
method allows me to add consumer, but I cant pass ConsumerTypeDefinition as I don't have any defined and I don't want to define one for each consumer. Also can't use x.AddConsumer<T>(...)
because I have type of consumer in a dynamic variable. Also tried x.AddConsumer(impl.ImplementedType).Endpoint(e => e.?)
but endpoint configurator allows only changing name, temporary setting, and few other settings but not something like DiscardFaultedMessages()
.
Using MassTransit / AspNetCore / RabbitMq 7.2.4
Final solution (thanks Chris):
// Actually this definition class is not even aware of exact
// consumer type.
public class CommandResponseConsumerDefinition<TConsumer> : ConsumerDefinition<TConsumer>
where TConsumer : class, IConsumer
{
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<TConsumer> consumerConfigurator)
{
endpointConfigurator.DiscardFaultedMessages();
}
}
assemblyToLoadConsumersFrom!
.GetAllImplementations(typeof(CommandResponseConsumerBase<,>))
.ToList()
.ForEach(impl =>
{
// create definition type that is aware of exact consumer type
var definitionType = typeof(CommandResponseConsumerDefinition<>)
.MakeGenericType(impl.ImplementedType);
// Add consumer type along with definition type
x.AddConsumer(impl.ImplementedType, definitionType);
});
Upvotes: 1
Views: 847
Reputation: 33278
You might be able to create two generic consumer definitions, one for the query base, and one for the command base, register the implemented generic type of the definition along with the consumer (when calling AddConsumer
).
Something like:
public class QueryConsumerDefinition<TConsumer> :
ConsumerDefinition<TConsumer>
where TConsumer : QueryConsumerBase
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
{
endpointConfigurator.DiscardFaultedMessages();
}
}
Upvotes: 1