Reputation: 165
I'm using masstransit/rabbitmq in net core 3.1. I have a dispatcher service which will send messages to worker services when they are available. Each worker service has a rabbitmq queue in front which is created when the service starts. I want to make sure that when the worker service stops, then the queue(and exchange) needs to be deleted. I have been able to get it to work when I set flag AutoDelete in the configuration (Program.cs):
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config =>
{
config.Host(settings.RabbitMq.Host, settings.RabbitMq.Port,
settings.RabbitMq.VirtualHost, h =>
{
h.Username(settings.RabbitMq.Username);
h.Password(settings.RabbitMq.Password);
});
var queueName = AssembleQueueName(settings);
var sp = services.BuildServiceProvider();
config.ReceiveEndpoint(queueName,
e =>
{
e.Consumer(() => new MessageConsumer());
e.AutoDelete = true;
});
}));
});
Unfortunately this does not work for me because I need have the ServiceProvider in my consumer class so therefore I'm doing the following instead (Worker.cs):
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = _settings.RabbitMq.ServicePrefixQueueName + "-" + _settings.ServiceId;
var messageHandler = _busControl.ConnectReceiveEndpoint(queueName, x =>
{
x.Consumer<MessageConsumer>(_serviceProvider);
});
await messageHandler.Ready;
_workerWitness.IsWorkerReady = true;
}
But here I don't know how to set the AutoDelete flag. Is it even possible?
Upvotes: 0
Views: 901
Reputation: 33278
If you follow the documentation, on configuring consumers with a container, you would see that you can configure your consumers so that they are resolved from the container as shown below (your code, updated to be correct):
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
x.UsingRabbitMq((context, config) =>
{
config.Host(settings.RabbitMq.Host, settings.RabbitMq.Port,
settings.RabbitMq.VirtualHost, h =>
{
h.Username(settings.RabbitMq.Username);
h.Password(settings.RabbitMq.Password);
});
var queueName = AssembleQueueName(settings);
config.ReceiveEndpoint(queueName, e =>
{
e.AutoDelete = true;
e.ConfigureConsumer<MessageConsumer>(context);
});
}));
});
Upvotes: 1