Rebecca
Rebecca

Reputation: 14412

MassTransit - Explanation of PrefetchCount and multiple channels for a single consumer

I've been playing around with PreFetch, and trying to work out why the PreFetch is always set to 0 on the Management interface for the queue. In the RabbitMQ Management Interface I can see the configured Prefetch on the channels, but not the queue itself. I've also noticed that they are registered as "global" rather than "per consumer", but for the life of me I can't seem to find the setting to change that in MassTransit, although I'm guessing I have a misunderstanding of how this works, and the docs haven't helped to give me an ELI5.

This is an example config:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
   var host = cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });

    cfg.ReceiveEndpoint(
        host,
        "TEST-QUEUE-PF",
        ec =>
        {
            ec.Consumer<MyConsumer>(context);
            ec.PrefetchCount = 50; // consumer specific
            ec.UseConcurrencyLimit(1); // consumer specific
        });

    cfg.PrefetchCount = 100; // bus control specific
    cfg.UseConcurrencyLimit(1); // bus control specific
});

This creates the following queue:

Queue

And then looking at the channel I see the following information about prefetch:

enter image description here

And if I look at all the channels I see the following:

enter image description here

I am struggling to understand what each of these PrefetchCounts relates to.

As a bit of background, we have several multi-core servers running consumers (i.e. Round-Robin, or more appropriately "Hungry Hippo" since I don't care about equal distribution). The default settings for PrefetchCount and ConcurrencyLimit are not working for us very well, because our consumer has quite a lot of work to do, and it's overloading the database server causing timeouts. I'm looking to find a way to configure these consumers so that they don't do that.

This is MassTransit 5.5.5, since anything over that breaks the UseSerilog() integration, and I can't find an easy upgrade path. Erlang and RabbitMq themselves are the current versions. This is the AutoFac module in more detail:

private class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
        builder.Register(context =>
        {
            var busSettings = context.Resolve<BusSettings>();
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(
                    new Uri(busSettings.HostAddress),
                    h =>
                    {
                        h.Username(busSettings.Username);
                        h.Password(busSettings.Password);
                    });

                cfg.ReceiveEndpoint(
                    host,
                    $"TEST-QUEUE-GLOBAL", // shared queue name for all nodes
                    ec =>
                    {
                        ec.PrefetchCount = 50;
                        ec.UseConcurrencyLimit(2);
                        ec.Consumer<MyConsumer>(context);
                        ec.EnablePriority(5);
                        ec.UseRetry(retryConfig =>
                        {
                            retryConfig
                                .Intervals(new[] { 1, 2, 4, 8, 16, 32 }
                                .Select(t => TimeSpan.FromMinutes(t))
                                .ToArray());
                            retryConfig
                                .Handle<HttpRequestException>();
                            retryConfig
                                .Handle<SwaggerException>(ex => ex.IsRetryValid());
                        });
                    });

                cfg.PrefetchCount = 100;
                cfg.UseConcurrencyLimit(2);
                cfg.UseSerilog();

                var correlationIdProvider = context.Resolve<ICorrelationProvider>();
                cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
                {
                    sendContext.CorrelationId = 
                        sendContext.CorrelationId == Guid.Empty ? 
                            correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
                }));
            });

            return busControl;
        })
        .SingleInstance()
        .As<IBusControl>()
        .As<IBus>();
    }
}

Upvotes: 5

Views: 8311

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33457

First, I assume you're on an older version of MassTransit, as the switch was made to move away from a global prefetch starting with v6.

Second, a high prefetch count combined with a concurrency limit of 1 is going to cause (prefetchcount - 1) messages to sit on a receive endpoint waiting to process while 1 message processes at a time. So, if there are only 50 messages, the first node may get them all and then your other nodes are idle since the messages are waiting on a single node to get through the bottleneck.

A current version of the RabbitMQ Management console, with the channel prefetch is shown below:

RabbitMQ Prefetch Count

Since MassTransit only puts a single consumer on a channel, the previous approach essentially limited the consumer to the global channel prefetch, but now it's more explicit. Also, the new setting works with quorum queues, which don't support a global prefetch setting.

If you're overloading your database, and have already optimized the database query to avoid locking/blocking, and need to reduce the flow, lower the prefetch to be closer to something like 140% of your concurrency limit. So, seriously, if you're at a 1, set prefetch to maybe 2.

Upvotes: 4

Related Questions