Leeder
Leeder

Reputation: 77

How to add x-max-length and x-overflow to MassTransit queue while configuring connection?

I have a consumer, who needs to consume messages from an existing RabbitMQ queue. It works fine, when the queue is configured normally, without any settings.

services.AddMassTransit(config =>
{
    config.AddConsumer<OrderConsumer>();
    config.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host("amqp://user:12345@localhost:54425");
        cfg.ReceiveEndpoint("transient-order-queue", c =>
        {
            c.ConfigureConsumer<OrderConsumer>(ctx);
        });
    });
});
services.AddMassTransitHostedService();

In order to get some work done, I need to configure the queue with a couple of features.

Features    
x-max-length:   1000
x-overflow: reject-publish
arguments:  
x-queue-type:   classic
durable:    true

How could I configure my consumer to connect to that queue? It gives me errors like that:

PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'transient-order-queue' in vhost '/': received none but current is the value '1000' of type 'long'

But I don't understand, how to add these args to the MassTransit configuration. Please, help me out!

Upvotes: 2

Views: 1070

Answers (3)

kertak
kertak

Reputation: 58

I'm use this solution realize interface of IConfigureReceiveEndpoint

public class ConfigureReceiveEndpointQueueSize : IConfigureReceiveEndpoint
{
    public void Configure(string name, IReceiveEndpointConfigurator configurator)
    {
        if (configurator is IRabbitMqReceiveEndpointConfigurator rabbitMqConfigurator)
        {
            rabbitMqConfigurator.SetQueueArgument("x-max-length", 10000);
            rabbitMqConfigurator.SetQueueArgument("x-overflow", "drop-head");
        }
    }
}

and then add it to DI in STartup

services.AddTransient<IConfigureReceiveEndpoint, ConfigureReceiveEndpointQueueSize>();

this queue attributes would be add in every Consumer Message Queue

Upvotes: 0

Leeder
Leeder

Reputation: 77

Found the answer. You can add attributes to the queue with the following syntax:

cfg.ReceiveEndpoint("transient-order-queue", c =>
{
    c.ConfigureConsumer<OrderConsumer>(ctx);
    c.SetQueueArgument("x-overflow", "reject-publish");
    c.SetQueueArgument("x-max-length", 1000);
});

Upvotes: 0

Chris Patterson
Chris Patterson

Reputation: 33457

Simply add them as queue attributes:

cfg.ReceiveEndpoint("transient-order-queue", c =>
        {
            c.QueueAttributes["x-max-length"] = 1000;
            c.QueueAttributes["x-overflow"] = "reject-publish";
            c.ConfigureConsumer<OrderConsumer>(ctx);
        });

Upvotes: 2

Related Questions