David McClelland
David McClelland

Reputation: 2756

MassTransit / RabbitMQ - why so many messages get skipped?

I'm working with 2 .NET Core console applications in a producer/consumer scenario with MassTransit/RabbitMQ. I need to ensure that even if NO consumers are up-and-running, the messages from the producer are still queued up successfully. That didn't seem to work with Publish() - the messages just disappeared, so I'm using Send() instead. The messages at least get queued up, but without any consumers running the messages all end up in the "_skipped" queue.

So that's my first question: is this the right approach based on the requirement (even if NO consumers are up-and-running, the messages from the producer are still queued up successfully)?

With Send(), my consumer does indeed work, but still many messages are falling through the cracks and getting dumped into to the "_skipped" queue. The consumer's logic is minimal (just logging the message at the moment) so it's not a long-running process.

So that's my second question: why are so many messages still getting dumped into the "_skipped" queue?

And that leads into my third question: does this mean my consumer needs to listen to the "_skipped" queue as well?


I am unsure what code you need to see for this question, but here's a screenshot from the RabbitMQ management UI:

RabbitMQ queues

Producer configuration:

    static IHostBuilder CreateHostBuilder(string[] args)
    {
        return Host.CreateDefaultBuilder()
                      .ConfigureServices((hostContext, services) =>
                      {
                          services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));

                          services.AddMassTransit(cfg =>
                          {
                              cfg.AddBus(ConfigureBus);
                          });

                          services.AddHostedService<CardMessageProducer>();
                      })
                      .UseConsoleLifetime()
                      .UseSerilog();
    }

    static IBusControl ConfigureBus(IServiceProvider provider)
    {
        var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
            {
                h.Username(options.RabbitMQ_Username);
                h.Password(options.RabbitMQ_Password);
            });

            cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
            {
                EndpointConvention.Map<CardMessage>(e.InputAddress);
            });
        });
    }

Producer code:

Bus.Send(message);

Consumer configuration:

    static IHostBuilder CreateHostBuilder(string[] args)
    {
        return Host.CreateDefaultBuilder()
                      .ConfigureServices((hostContext, services) =>
                      {
                          services.AddSingleton<CardMessageConsumer>();

                          services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));

                          services.AddMassTransit(cfg =>
                          {
                              cfg.AddBus(ConfigureBus);
                          });

                          services.AddHostedService<MassTransitHostedService>();
                      })
                      .UseConsoleLifetime()
                      .UseSerilog();
    }

    static IBusControl ConfigureBus(IServiceProvider provider)
    {
        var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
            {
                h.Username(options.RabbitMQ_Username);
                h.Password(options.RabbitMQ_Password);
            });

            cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
            {
                e.Consumer<CardMessageConsumer>(provider);
            });

            //cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName + "_skipped", e =>
            //{
            //    e.Consumer<CardMessageConsumer>(provider);
            //});
        });
    }

Consumer code:

class CardMessageConsumer : IConsumer<CardMessage>
{
    private readonly ILogger<CardMessageConsumer> logger;
    private readonly ApplicationConfiguration configuration;
    private long counter;

    public CardMessageConsumer(ILogger<CardMessageConsumer> logger, IOptions<ApplicationConfiguration> options)
    {
        this.logger = logger;
        this.configuration = options.Value;
    }

    public async Task Consume(ConsumeContext<CardMessage> context)
    {
        this.counter++;

        this.logger.LogTrace($"Message #{this.counter} consumed: {context.Message}");
    }
}

Upvotes: 1

Views: 6928

Answers (3)

Rajeev
Rajeev

Reputation: 405

In my case, the same queue listens to multiple consumers at the same time

Upvotes: 0

maldworth
maldworth

Reputation: 111

When the consumer starts the bus bus.Start(), one of the things it does is create all exchanges and queues for the transport. If you have a requirement that publish/send happens before the consumer, your only option is to run DeployTopologyOnly. Unfortunately this feature is not documented in official docs, but the unit tests are here: https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.RabbitMqTransport.Tests/BuildTopology_Specs.cs

The skipped queue happens when messages are sent to a consumer that doesn't know how to process.

For example if you have a consumer that can process IConsumer<MyMessageA> which is on receive endpoint name "my-queue-a". But then your message producer does Send<MyMessageB>(Uri("my-queue-a")...), Well this is a problem. The consumer only understands the A, it doesn't know how to process B. And so it just moves it to a skipped queue and continues on.

Upvotes: 0

Alexey Zimarev
Alexey Zimarev

Reputation: 19640

In MassTransit, the _skipped queue is the implementation of the dead letter queue concept. Messages get there because they don't get consumed.

MassTransit with RMQ always delivers a message to an exchange, not to a queue. By default, each MassTransit endpoint creates (if there's no existing queue) a queue with the endpoint name, an exchange with the same name and binds them together. When the application has a configured consumer (or handler), an exchange for that message type (using the message type as the exchange name) also gets created and the endpoint exchange gets bound to the message type exchange. So, when you use Publish, the message is published to the message type exchange and gets delivered accordingly, using the endpoint binding (or multiple bindings). When you use Send, the message type exchange is not being used, so the message gets directly to the destination exchange. And, as @maldworth correctly stated, every MassTransit endpoint only expects to get messages that it can consume. If it doesn't know how to consume the message - the message is moved to the dead letter queue. This, as well as the poison message queue, are fundamental patterns of messaging.

If you need messages to queue up to be consumed later, the best way is to have the wiring set up, but the endpoint itself (I mean the application) should not be running. As soon as the application starts, it will consume all queued messages.

Upvotes: 2

Related Questions