gipinani
gipinani

Reputation: 14418

MassTransit exchange creation

I'm using MassTransit with RabbitMQ on a .net core 6 web application. My goal is to keep in sync several instances of an application, running on different plants. The application needs to be able to publish / consume messages.

When a site publishes something, this is broadcasted to all the sites queues (also itself, it will simply discard the message).

In order to do it, I configured MassTransit queue names with the suffix of the plant: eg norm-queue-CV, norm-queue-MB. I configured also the Consumer to bind to a generic fanout exchange name (norm-exchange).

Here my configuration extract:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMassTransit(x =>
    {
        x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
        {

            cfg.Host(new Uri(_configuration["RabbitMQ:URI"] + _configuration["RabbitMQ:VirtualHost"]), $"ENG {_configuration["Application:PlantID"]} Producer", h =>
            {
                h.Username(_configuration["RabbitMQ:UserName"]);
                h.Password(_configuration["RabbitMQ:Password"]);
            });

            cfg.Publish<NormCreated>(x =>
            {
                x.Durable = true;
                x.AutoDelete = false;
                x.ExchangeType = "fanout"; // default, allows any valid exchange type
            });
        }));
    });

    // consumer
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(new Uri(_configuration["RabbitMQ:URI"] + _configuration["RabbitMQ:VirtualHost"]), $"ENG {_configuration["Application:PlantID"]} Consumer", h =>
        {
            h.Username(_configuration["RabbitMQ:UserName"]);
            h.Password(_configuration["RabbitMQ:Password"]);

        });

        cfg.ReceiveEndpoint($"norm-queue-{_configuration["Application:PlantID"]}", e =>
        {
            e.Consumer<NormConsumer>();
            e.UseConcurrencyLimit(1);
            e.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
            e.Bind<NormCreated>();
            e.Bind("norm-exchange");
        });
    });
    busControl.Start();

And here how NormConsumer is defined

public class NormConsumer : IConsumer<NormCreated>
{
    private readonly ILogger<NormConsumer>? logger;

    public NormConsumer()
    {

    }

    public NormConsumer(ILogger<NormConsumer> logger)
    {
        this.logger = logger;
    }

    public async Task Consume(ConsumeContext<NormCreated> context)
    {
        logger.LogInformation("Norm Submitted: {NormID}", context.Message.NormID);

        //await context.Publish<NormCreated>(new
        //{
        //    context.Message.OrderId
        //});
    }
}

Here the queues automatically created. To me they look fine enter image description here

And here the exchange created. I was trying to get only one exchange (norm-exchange), but also the other 2 are created. enter image description here

My problem is first of all understand if my layout makes sense (I'm quite new to Rabbit/Masstransit).

Moreover I'd like to override how exchanges are named, forcing to have for this queues only one exchange: "norm-exchange". I tried to override it in "producer" part, but not able to do it

Upvotes: 1

Views: 2652

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33565

RabbitMQ broker topology is covered extensively in RabbitMQ - The Details, and also in the documentation.

You do not need to call Bind in the receive endpoint, consumer message types are already bound for you. Remove both Bind statements, and any published messages will be routed by type to the receive endpoints.

Upvotes: 2

Related Questions