Kaif Khan
Kaif Khan

Reputation: 1

MassTransit with RabbitMQ: Priority Messages Not Processed in Correct Order

I am using MassTransit with RabbitMQ as a message bus and I'm attempting to send a batch of messages with priorities using C#. However, when I send the batch request with priorities, the responses come back in the same order as the messages in the array, rather than the prioritized order I expect.

builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<WeatherInfoRequestConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("url", "username", h =>
        {
            h.Username("username");
            h.Password("password");
        });

        // Configure priority queue
        cfg.ReceiveEndpoint("weather-info-priority-queue", e =>
        {
            // Enable message priority (0-9)
            e.SetQueueArgument("x-max-priority", 10);
            // Set lower prefetch count to ensure high-priority messages aren't waiting, this helps to process the messages one by one
            e.PrefetchCount = 1;
            e.ConcurrentMessageLimit = 1;

            e.UseMessageRetry(r => r.Immediate(5));

            e.ConfigureConsumer<WeatherInfoRequestConsumer>(context);
        });
    });
});

My Program.cs config

[HttpGet("batch-test-concurrent")]
        public async Task<IActionResult> BatchTestConcurrent(CancellationToken cancellationToken)
        {
            var cities = new[]
            {
        (City: "Paris", Priority: (byte)2),
        (City: "Sydney", Priority: (byte)3),
        (City: "Cancun", Priority: (byte)9),
        (City: "Madrid", Priority: (byte)5),
        (City: "Tokyo", Priority: (byte)1),
        (City: "Barcelona", Priority: (byte)1),
        (City: "Berlin", Priority: (byte)1)
    };

            // Send all requests with a small delay between them
            var tasks = cities.Select(async (tuple, index) =>
            {
                var (City, Priority) = tuple;
                // Small delay to ensure messages enter the queue separately
                await Task.Delay(100 * index, cancellationToken);

                _logger.LogInformation("Sending request for {City} with priority {Priority}", City, Priority);

                var request = new WeatherInfoRequest
                {
                    City = City,
                    RequestId = Guid.NewGuid()
                };
                Console.WriteLine("Sending request for city: {0} with priority {1}", City, Priority);
                var response = await _requestClient.GetResponse<WeatherInfoResponse>(
                    request,
                    ctx =>
                    {
                        ctx.UseExecute(sendContext =>
                        {
                            sendContext.SetPriority(Priority);
                            _logger.LogInformation("Set priority {Priority} for {City}", Priority, City);
                        });
                    },
                    cancellationToken
                );

                Console.WriteLine("Received response for city: {0}", response.Message.City);
                return new
                {
                    response.Message.City,
                    Temperature = response.Message.TemperatureC,
                    TemperatureF = 32 + (int)(response.Message.TemperatureC / 0.5556),
                    Priority
                };
            });

            var results = await Task.WhenAll(tasks);
            return Ok(results.ToList());
        }

My controller function where i hit the call

using MassTransit;
using MT_Req_Res.Models;

namespace MT_Req_Res.Consumers
{
    public class WeatherInfoRequestConsumer : IConsumer<WeatherInfoRequest>
    {
        private static readonly string[] Summaries =
        [
            "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
        ];

        private readonly ILogger<WeatherInfoRequestConsumer> _logger;

        public WeatherInfoRequestConsumer(ILogger<WeatherInfoRequestConsumer> logger)
        {
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<WeatherInfoRequest> context)
        {
            _logger.LogInformation("Processing request for city: {City} , RequestId: {RequestId}",
                context.Message.City,
                context.Message.RequestId);
            // To retrieve the AMQP priority, get the RabbitMQ consume context payload
            if (context.TryGetPayload<RabbitMqBasicConsumeContext>(out var rabbitContext))
            {
                var priority = rabbitContext.Properties?.Priority;
                _logger.LogInformation("Message AMQP priority: {Priority}", priority);
            }
            else
            {
                _logger.LogInformation("No RabbitMQ context payload found (not RabbitMQ or priority not set).");
            }

            // Simulate some processing time
            await Task.Delay(2000);

            // Create a response
            var response = new WeatherInfoResponse
            {
                City = context.Message.City,
                TemperatureC = Random.Shared.Next(-20, 55),
                Summary = Summaries[Random.Shared.Next(Summaries.Length)],
                RequestId = context.Message.RequestId
            };

            _logger.LogInformation("Sending response for city: {City}, temp: {Temp}°C",
                response.City, response.TemperatureC);

            // Send the response back
            await context.RespondAsync(response);
        }
    }
}

My Consumer

Issue:

When I hit the API endpoint, I get the responses back in the same order as the messages in the array, as shown below:

[
  {
    "city": "Paris",
    "temperature": 36,
    "temperatureF": 96,
    "priority": 2
  },
  {
    "city": "Sydney",
    "temperature": -1,
    "temperatureF": 31,
    "priority": 3
  },
  {
    "city": "Cancun",
    "temperature": 47,
    "temperatureF": 116,
    "priority": 9
  },
  {
    "city": "Madrid",
    "temperature": 40,
    "temperatureF": 103,
    "priority": 5
  },
  {
    "city": "Tokyo",
    "temperature": 50,
    "temperatureF": 121,
    "priority": 1
  },
  {
    "city": "Barcelona",
    "temperature": 21,
    "temperatureF": 69,
    "priority": 1
  },
  {
    "city": "Berlin",
    "temperature": 24,
    "temperatureF": 75,
    "priority": 1
  }
]

The responses are not coming back in the order of priority (i.e., "Cancun" with priority 9 should be processed first, but it is not). I have configured the priority queue in RabbitMQ and used SetPriority in the request message, but the messages are still being processed in the order they are sent. How can I make sure the messages are processed in priority order?

Upvotes: 0

Views: 14

Answers (0)

Related Questions