Phil
Phil

Reputation: 102

Masstransit can't connect to a queue in RabbitMQ from c# app

I have a queue in RabbitMQ. I can't configurate this queue, I have to consume messages from it. Publisher don't use Masstransit for publishing. I am using Masstransit to consume messages from the queue.

When I am trying to configurate connection to the queue, I am receiving this error:

The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'my_vhost': received 'fanout' but current is 'direct'', classId=40, methodId=10

My configuration looks like:

Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host("127.0.0.1", "my_virtual_host", credintials =>
    {
        credintials.Username("myuser");
        credintials.Password("mypassword");
    });

    cfg.ReceiveEndpoint("my_queue", e =>
    {

        e.UseRawJsonSerializer();
        e.Consumer(() => _messageConsumer);
    });
}).Start();

The queue has configuration Durable = true and that's it, nothing special.

When I am trying to connect to the queue via RabbitMQ.Client, it connects without problems. Consuming works well too.

How can I solve the problem?

Upvotes: 0

Views: 3084

Answers (2)

Phil
Phil

Reputation: 102

I solve my problem. Connection to Masstransit looks like:

private async Task InitMasstransitBusAsync(CancellationToken cancellationToken)
{
    await Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(new Uri(_rabbitMqConfig.HostName), credintials =>
        {
            credintials.Username(_rabbitMqConfig.UserName);
            credintials.Password(_rabbitMqConfig.Password);

        });
        cfg.ReceiveEndpoint(_rabbitMqConfig.QueueName, e =>
        {
            e.PrefetchCount = 20;
            e.ExchangeType = "direct";
            e.ConfigureConsumeTopology = false;
            e.AddMessageDeserializer(new ContentType("text/plain"),
                () => new CustomMessageDeserializer("text/plain"));
            e.Consumer(() => _messageConsumer);
        });
    }).StartAsync(cancellationToken);
}

CustomMessageDeserializer:

    public class CustomMessageDeserializer : IMessageDeserializer
    {
        private readonly string _contentType;
        private readonly JsonSerializer _serializer = JsonSerializer.Create();

        public CustomMessageDeserializer(string contentType)
        {
            _contentType = contentType;
        }

        public ContentType ContentType => new(_contentType);

        public ConsumeContext Deserialize(ReceiveContext receiveContext)
        {
            try
            {
                var messageEncoding = GetMessageEncoding(receiveContext);
                using var body = receiveContext.GetBodyStream();
                using var reader = new StreamReader(body, messageEncoding, false, 1024, true);
                using var jsonReader = new JsonTextReader(reader);
                var messageToken = _serializer.Deserialize<JToken>(jsonReader);
                
                return new RawJsonConsumeContext(_serializer, receiveContext, messageToken);
            }
            catch (JsonSerializationException ex)
            {
                throw new SerializationException("A JSON serialization exception occurred while deserializing the message", ex);
            }
            catch (SerializationException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw new SerializationException("An exception occurred while deserializing the message", ex);
            }
        }

        public void Probe(ProbeContext context) { }

        public static Encoding GetMessageEncoding(ReceiveContext receiveContext)
        {
            var contentEncoding = receiveContext.TransportHeaders.Get("Content-Encoding", default(string));
            return string.IsNullOrWhiteSpace(contentEncoding) ? Encoding.UTF8 : Encoding.GetEncoding(contentEncoding);
        }

    }

Upvotes: 0

Chris Patterson
Chris Patterson

Reputation: 33522

The issue is the my_queue exchange already exists with a direct exchange type. By default, MassTransit will create this exchange as a fanout exchange. Direct exchanges are used for routing messages via routing key. For an example of using direct exchanges with MassTransit, check out the direct sample.

You can see the broker topology configured by MassTransit for RabbitMQ.

Upvotes: 1

Related Questions