Reputation: 4033
I have a clear understanding of consuming messages: http://docs.masstransit-project.com/en/latest/usage/consumer.html
these implementations only handle ONE message at a time.
I need to handle multiple messages at a time, in bulk, in batches.
Upvotes: 4
Views: 3864
Reputation: 151
Mass Transit now has an experimental feature to process individual message's in a batch.
Configure your bus:
_massTransitBus = Bus.Factory.CreateUsingRabbitMq(
cfg =>
{
var host = cfg.Host(new Uri("amqp://@localhost"),
cfg =>
{
cfg.Username("");
cfg.Password("");
});
cfg.ReceiveEndpoint(
host,
"queuename",
e =>
{
e.PrefetchCount = 30;
e.Batch<MySingularEvent>(
ss =>
{
ss.MessageLimit = 30;
ss.TimeLimit = TimeSpan.FromMilliseconds(1000);
ss.Consumer(() => new BatchSingularEventConsumer());
});
});
});
And Create your Consumer:
public class BatchSingularEventConsumer: IConsumer<Batch<MySingularEvent>>
{
public Task Consume(ConsumeContext<Batch<MySingularEvent>> context)
{
Console.WriteLine($"Number of messages consumed {context.Message.Length}");
return Task.CompletedTask;
}
}
You can configure your Batch with a Message Limit and a Time Limit.
I suggest reading Chris Patterson's issue on the matter Batch Message Consumption especially the part regarding prefetch
The batch size must be less than or equal to any prefetch counts or concurrent message delivery limits in order reach the size limit. If other limits prevent the batch size from being reached, the consumer will never be called.
Batch consumption is also documented on the MassTransit website.
Upvotes: 4
Reputation: 3409
As it turns out, today you can do this:
public class MyConsumer : IConsumer<Batch<MyMessage>>
{
public async Task Consume(ConsumeContext<Batch<MyMessage>> context)
{
...
}
}
Upvotes: 3