Chris Hayes
Chris Hayes

Reputation: 4033

masstransit handle/consume messages in batches

I have a clear understanding of consuming messages: http://docs.masstransit-project.com/en/latest/usage/consumer.html

these implementations only handle ONE message at a time.

I need to handle multiple messages at a time, in bulk, in batches.

Upvotes: 4

Views: 3864

Answers (2)

Will
Will

Reputation: 151

Mass Transit now has an experimental feature to process individual message's in a batch.

Configure your bus:

_massTransitBus = Bus.Factory.CreateUsingRabbitMq(
                cfg =>
                    {
                        var host = cfg.Host(new Uri("amqp://@localhost"),
                            cfg =>
                                {
                                    cfg.Username("");
                                    cfg.Password("");
                                });

                        cfg.ReceiveEndpoint(
                            host,
                            "queuename",
                            e =>
                                {
                                    e.PrefetchCount = 30;
                                    e.Batch<MySingularEvent>(
                                        ss =>
                                            {
                                                ss.MessageLimit = 30;
                                                ss.TimeLimit = TimeSpan.FromMilliseconds(1000);
                                                ss.Consumer(() => new BatchSingularEventConsumer());
                                            });
                                });
                    });

And Create your Consumer:

public class BatchSingularEventConsumer: IConsumer<Batch<MySingularEvent>>
{
    public Task Consume(ConsumeContext<Batch<MySingularEvent>> context)
    {           
         Console.WriteLine($"Number of messages consumed {context.Message.Length}");
         return Task.CompletedTask;
    }
}

You can configure your Batch with a Message Limit and a Time Limit.

I suggest reading Chris Patterson's issue on the matter Batch Message Consumption especially the part regarding prefetch

The batch size must be less than or equal to any prefetch counts or concurrent message delivery limits in order reach the size limit. If other limits prevent the batch size from being reached, the consumer will never be called.

Batch consumption is also documented on the MassTransit website.

Upvotes: 4

Sipke Schoorstra
Sipke Schoorstra

Reputation: 3409

As it turns out, today you can do this:

public class MyConsumer : IConsumer<Batch<MyMessage>>
{
    public async Task Consume(ConsumeContext<Batch<MyMessage>> context)
    {
        ...
    }
}

Upvotes: 3

Related Questions