MarkHasper
MarkHasper

Reputation: 187

Why is MassTransit only consuming messages on one single thread

I have the following simple code in a regular console application, and would expect (some of) the commands to be consumed in parallel. I thought that the UseConcurrencyLimit would set the number of concurrent threads. What I am seeing is that the RabbitMQ does have 10 unacked messages, but that the consumer consumes them serially, with a one second pause between each console.writeline. I must be missing something obvious, but I don't get it.

public static class EventHandler
{
    public static void Run()
    {
        var personQueueName = "RabbitMqPoc.Person";

        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {

            cfg.UseConcurrencyLimit(10);

            var host = cfg.Host(new Uri("rabbitmq://localhost"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            cfg.ReceiveEndpoint(host, personQueueName, e =>
            {
                e.UseConcurrencyLimit(10);
                e.PrefetchCount = 10;
                e.Consumer<PersonConsumer>();
            });
        });

        var personSendEndpoint = busControl.GetSendEndpoint(new Uri($"rabbitmq://localhost/{personQueueName}")).Result;

        busControl.Start();

        foreach (var x in Enumerable.Range(1, 20))
        {
            personSendEndpoint.Send(new PersonUpdated() { Name = "Mina Ives", Key = Guid.NewGuid() });
        }

        Console.ReadLine();
        busControl.Stop();
    }
}

internal class PersonConsumer : IConsumer<IPersonUpdated>
{
    public async Task Consume(ConsumeContext<IPersonUpdated> context)
    {
        Thread.Sleep(1000);
        Console.WriteLine($"Updated {context.Message.Name}");
    }

}

Upvotes: 3

Views: 2571

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33512

Changing Thread.Sleep(1000); to await Task.Delay(1000); resolves the issue that you're seeing.

Thread.Sleep wreaks havoc on the TPL for some reason.

Upvotes: 3

Related Questions