Reputation: 187
I have the following simple code in a regular console application, and would expect (some of) the commands to be consumed in parallel. I thought that the UseConcurrencyLimit
would set the number of concurrent threads. What I am seeing is that the RabbitMQ does have 10 unacked messages, but that the consumer consumes them serially, with a one second pause between each console.writeline. I must be missing something obvious, but I don't get it.
public static class EventHandler
{
public static void Run()
{
var personQueueName = "RabbitMqPoc.Person";
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.UseConcurrencyLimit(10);
var host = cfg.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, personQueueName, e =>
{
e.UseConcurrencyLimit(10);
e.PrefetchCount = 10;
e.Consumer<PersonConsumer>();
});
});
var personSendEndpoint = busControl.GetSendEndpoint(new Uri($"rabbitmq://localhost/{personQueueName}")).Result;
busControl.Start();
foreach (var x in Enumerable.Range(1, 20))
{
personSendEndpoint.Send(new PersonUpdated() { Name = "Mina Ives", Key = Guid.NewGuid() });
}
Console.ReadLine();
busControl.Stop();
}
}
internal class PersonConsumer : IConsumer<IPersonUpdated>
{
public async Task Consume(ConsumeContext<IPersonUpdated> context)
{
Thread.Sleep(1000);
Console.WriteLine($"Updated {context.Message.Name}");
}
}
Upvotes: 3
Views: 2571
Reputation: 33512
Changing Thread.Sleep(1000);
to await Task.Delay(1000);
resolves the issue that you're seeing.
Thread.Sleep wreaks havoc on the TPL for some reason.
Upvotes: 3