Michael
Michael

Reputation: 3651

MassTransit is only batching 10 when more is configured

I'm trying to configure MassTransit batching, but when running it only batches 10 at a time.

hostHandler = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
    cfg.TrySetPrefetchCount(2000);
    cfg.Batch<T>(cfg =>
    {
        cfg.Consumer(() => consumer);
        cfg.ConcurrencyLimit = 2;
        cfg.MessageLimit = 1000;
        cfg.TimeLimit = TimeSpan.FromSeconds(1);
    });
    cfg.UseMessageRetry(r => r.Immediate(2)));
});

await hostHandler.Ready;

enter image description here

Upvotes: 2

Views: 1781

Answers (2)

Chris Patterson
Chris Patterson

Reputation: 33540

You could use the newer batch syntax as well, but it still needs to be specified prior to the Consumer call:

var handle = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
    cfg.TrySetPrefetchCount(2000);

    cfg.UseMessageRetry(r => r.Immediate(2)));

    cfg.ConfigureConsumer<YourConsumer>(context, cons =>
    {
        cons.Options<BatchOptions>(options => options
            .SetMessageLimit(1000)
            .SetTimeLimit(1000)
            .SetConcurrencyLimit(2));
    });
});

await handle.Ready;

You could also, since you're using the receive endpoint connector, configure the batch options in the consumer definition as shown in the documentation.

Upvotes: 5

Michael
Michael

Reputation: 3651

5 minutes after I posted the question I tried to change the order of the batch configuration, and putting the consumer as the last statement, did the trick.

hostHandler = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
    cfg.TrySetPrefetchCount(2000);
    cfg.Batch<T>(cfg =>
    {
        cfg.ConcurrencyLimit = 2;
        cfg.MessageLimit = 1000;
        cfg.TimeLimit = TimeSpan.FromSeconds(1);
        cfg.Consumer(() => consumer);
    });
    cfg.UseMessageRetry(r => r.Immediate(2)));
});

await hostHandler.Ready;

Upvotes: 0

Related Questions