Reputation: 1
I am using MassTransit with RabbitMQ as a message bus and I'm attempting to send a batch of messages with priorities using C#. However, when I send the batch request with priorities, the responses come back in the same order as the messages in the array, rather than the prioritized order I expect.
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<WeatherInfoRequestConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("url", "username", h =>
{
h.Username("username");
h.Password("password");
});
// Configure priority queue
cfg.ReceiveEndpoint("weather-info-priority-queue", e =>
{
// Enable message priority (0-9)
e.SetQueueArgument("x-max-priority", 10);
// Set lower prefetch count to ensure high-priority messages aren't waiting, this helps to process the messages one by one
e.PrefetchCount = 1;
e.ConcurrentMessageLimit = 1;
e.UseMessageRetry(r => r.Immediate(5));
e.ConfigureConsumer<WeatherInfoRequestConsumer>(context);
});
});
});
My Program.cs config
[HttpGet("batch-test-concurrent")]
public async Task<IActionResult> BatchTestConcurrent(CancellationToken cancellationToken)
{
var cities = new[]
{
(City: "Paris", Priority: (byte)2),
(City: "Sydney", Priority: (byte)3),
(City: "Cancun", Priority: (byte)9),
(City: "Madrid", Priority: (byte)5),
(City: "Tokyo", Priority: (byte)1),
(City: "Barcelona", Priority: (byte)1),
(City: "Berlin", Priority: (byte)1)
};
// Send all requests with a small delay between them
var tasks = cities.Select(async (tuple, index) =>
{
var (City, Priority) = tuple;
// Small delay to ensure messages enter the queue separately
await Task.Delay(100 * index, cancellationToken);
_logger.LogInformation("Sending request for {City} with priority {Priority}", City, Priority);
var request = new WeatherInfoRequest
{
City = City,
RequestId = Guid.NewGuid()
};
Console.WriteLine("Sending request for city: {0} with priority {1}", City, Priority);
var response = await _requestClient.GetResponse<WeatherInfoResponse>(
request,
ctx =>
{
ctx.UseExecute(sendContext =>
{
sendContext.SetPriority(Priority);
_logger.LogInformation("Set priority {Priority} for {City}", Priority, City);
});
},
cancellationToken
);
Console.WriteLine("Received response for city: {0}", response.Message.City);
return new
{
response.Message.City,
Temperature = response.Message.TemperatureC,
TemperatureF = 32 + (int)(response.Message.TemperatureC / 0.5556),
Priority
};
});
var results = await Task.WhenAll(tasks);
return Ok(results.ToList());
}
My controller function where i hit the call
using MassTransit;
using MT_Req_Res.Models;
namespace MT_Req_Res.Consumers
{
public class WeatherInfoRequestConsumer : IConsumer<WeatherInfoRequest>
{
private static readonly string[] Summaries =
[
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
];
private readonly ILogger<WeatherInfoRequestConsumer> _logger;
public WeatherInfoRequestConsumer(ILogger<WeatherInfoRequestConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<WeatherInfoRequest> context)
{
_logger.LogInformation("Processing request for city: {City} , RequestId: {RequestId}",
context.Message.City,
context.Message.RequestId);
// To retrieve the AMQP priority, get the RabbitMQ consume context payload
if (context.TryGetPayload<RabbitMqBasicConsumeContext>(out var rabbitContext))
{
var priority = rabbitContext.Properties?.Priority;
_logger.LogInformation("Message AMQP priority: {Priority}", priority);
}
else
{
_logger.LogInformation("No RabbitMQ context payload found (not RabbitMQ or priority not set).");
}
// Simulate some processing time
await Task.Delay(2000);
// Create a response
var response = new WeatherInfoResponse
{
City = context.Message.City,
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
RequestId = context.Message.RequestId
};
_logger.LogInformation("Sending response for city: {City}, temp: {Temp}°C",
response.City, response.TemperatureC);
// Send the response back
await context.RespondAsync(response);
}
}
}
My Consumer
Issue:
When I hit the API endpoint, I get the responses back in the same order as the messages in the array, as shown below:
[
{
"city": "Paris",
"temperature": 36,
"temperatureF": 96,
"priority": 2
},
{
"city": "Sydney",
"temperature": -1,
"temperatureF": 31,
"priority": 3
},
{
"city": "Cancun",
"temperature": 47,
"temperatureF": 116,
"priority": 9
},
{
"city": "Madrid",
"temperature": 40,
"temperatureF": 103,
"priority": 5
},
{
"city": "Tokyo",
"temperature": 50,
"temperatureF": 121,
"priority": 1
},
{
"city": "Barcelona",
"temperature": 21,
"temperatureF": 69,
"priority": 1
},
{
"city": "Berlin",
"temperature": 24,
"temperatureF": 75,
"priority": 1
}
]
The responses are not coming back in the order of priority (i.e., "Cancun" with priority 9 should be processed first, but it is not). I have configured the priority queue in RabbitMQ and used SetPriority in the request message, but the messages are still being processed in the order they are sent. How can I make sure the messages are processed in priority order?
Upvotes: 0
Views: 14