robor
robor

Reputation: 3089

In RabbitMQ / MassTransit how to process ready messages while unacked messages are waiting to be retried?

We are using RabbitMQ 3.8.7 (Erlang 23.0) and MassTransit 7.0.4

We have a consumer configured to retry messages after 10 seconds and then again 15 minutes before the messages end up in the error queue. Below the retry configuration

endpointConfigurator.UseMessageRetry(r => r.Intervals(
    TimeSpan.FromSeconds(10),
    TimeSpan.FromMinutes(15)));

The message consists of a barcode (i.e. a text string).

The consumer attempts to lookup a parcel based on the barcode in a database. If found the parcel is updated. Otherwise a C# exception is thrown. Below the simplified code

using (var context = DbContextScopeFactory.Create())
{
    var barcode = message.Barcode;
    var parcel = await parcelService.Query(ti => ti.Barcode == barcode).SingleAsync(cancellationToken); //this line throws an exception if the barcode is not found
    parcel.Delivered = true;
    parcelService.Update(parcel);
    await context.SaveChangesAsync(cancellationToken);
}
return true;

The parcels are inserted into the database by another system. The consumer often gets a barcode before the parcel exists in the database. Also the consumer often gets invalid barcodes. Therefore the consumer retries barcodes after 10 seconds and 15 minutes instead of only trying once.

I ran a test with 10 messages where all 10 messages had invalid barcodes not in the database. This is the behavior I saw.

enter image description here

  1. 8 unacked messages: These were processed once, then retried after 10 seconds and are now waiting for the retry after 15 minutes.
  2. 2 ready messages: These 2 messages have not be processed at all.
  3. The 8 unacked messages are blocking the 2 ready messages. Until the 8 unacked messages are retried after 15 minutes, the 2 ready messages are not processed.

Below is what happens when the 8 unacked messages have been retried after 15 minutes.

enter image description here

Question

  1. How can I tell RabbitMQ / Masstransit to process the 2 ready messages instead of waiting for the 8 unacked messages ?
  2. I have not set UseConcurrencyLimit. If not set, what is the default value of UseConcurrencyLimit?

Upvotes: 0

Views: 626

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33457

Instead of UseMessageRetry with a long timeout (which is discouraged in the documentation), if you have the delayed exchange plug-in installed on RabbitMQ you should use a combination of:

x.UseDelayedRedelivery(r => r.Interval(2, TimeSpan.FromMinutes(15));
x.UseMessageRetry(r => r.Interval(1, TimeSpan.FromSeconds(3));

That way, you aren't blocking messages. Message retry is inline, redelivery is via the broker.

Upvotes: 1

Related Questions