Neo
Neo

Reputation: 163

Published messages going into skipped while using RabbitMQ with Masstransit

I have 2 applications, one for publishing and another for subscribing.

Let's see the publishing code used for registering masstransit.

context.Services.AddMassTransit<IEmployeeSvcBus>(x =>
            {                
                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host("rabbitmq://localhost:5672");

                    cfg.Message<ICreateEmployeeEto>(m => m.SetEntityName("CreateEmployee"));
                    cfg.Message<IUpdateEmployeeEto>(m => m.SetEntityName("UpdateEmployee"));
                    
                    cfg.Publish<ICreateEmployeeEto>(y =>
                    {
                        y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
                        y.AutoDelete = false;
                        y.Durable = true;
                    });

                    cfg.Publish<IUpdateEmployeeEto>(y =>
                    {
                        y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
                        y.AutoDelete = false;
                        y.Durable = true;
                    });                    
                });
            });

We're using multiple buses, hence I am using IEmployeeSvcBus for this bus.

public interface IEmployeeSvcBus: IBus
{
}

With the above configuration I meant to say that message types of ICreateEmployeeEto and IUpdateEmployeeEto should be published to exchanges with names CreateEmployee and UpdateEmployee respectively.

Issue 1: There is an exchange created with: <namespace>:ICreateEmployeeEto & <namespace>:IUpdateEmployeeEto.

For the time being I manually binded them to CreateEmployee & UpdateEmployee respectively in RabbitMQ.

Now let's look at the configuration at consumer application and also see the consumer classes.

context.Services.AddMassTransit<IEmployeeSvcBus>(x =>
            {
                x.AddConsumer<CreateEmployeeConsumer>();
                x.AddConsumer<UpdateEmployeeConsumer>();

                x.UsingRabbitMq((context, busCfg) =>
                {
                    busCfg.Host("rabbitmq://localhost:5672");
                    
                    busCfg.ReceiveEndpoint("employees.create_dev", ep =>
                    {
                        ep.Bind("CreateEmployee");
                        ep.ConfigureConsumeTopology = false;
                        ep.Durable = true;
                        ep.Lazy = true;
                        ep.Consumer<CreateEmployeeConsumer>();
                    });

                    busCfg.ReceiveEndpoint("employees.update_dev", ep =>
                    {
                        ep.Bind("UpdateEmployee");
                        ep.ConfigureConsumeTopology = false;
                        ep.Durable = true;
                        ep.Lazy = true;
                        ep.Consumer<UpdateEmployeeConsumer>();
                    });
                });
            });

Also see the consumer definitions:

public class CreateEmployeeConsumer: IConsumer<ICreateEmployeeEto>
    {
        private readonly IBus _localBus;
        private readonly ImySvc _mySvc;

        public CreateEmployeeConsumer(
            IBus localBus,
            ImySvc mySvc)
        {
            _localBus = localBus;
            _mySvc= mySvc;
        }

        public CreateEmployeeConsumer()
        {
            //For Bus Registration
        }

        public async Task Consume(ConsumeContext<ICreateEmployeeEto> context)
        {
            //Some operations.
        }
    }

Issue 2: When I publish the CreateEmployeeEto, its going to the employees.create_dev_skipped queue.

How'd I check what may be causing the issue? Is there some misconfiguration in the code that I missed? Please let me know. I am new to masstransit.

for issue 1: I tried checking the exchange name inside the Publish method by using x.Exchange.ExchangeName, it's giving as "CreateCompanyPlaceholder". But no matter what it's going to :ICreateEmployeeEto.

for issue 2: I tried commenting the injections used, removing binding and tried.

Upvotes: 2

Views: 808

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33278

When MassTransit moves a message to the _skipped queue, it's because the message type is not consumed on the receive endpoint. Be sure your messages are created properly and have the same type and namespace:

From the documentation.

MassTransit uses the full type name, including the namespace, for message contracts. When creating the same message type in two separate projects, the namespaces must match or the message will not be consumed.

Beyond that, if you wish to produce and consume messages using different exchange names for certain message types, you can use attributes or the message topology configuration (similar to what you specified above) to change the exchange names for those message types.

The changes should be specified on both the producer and the consumer to ensure consistency. At that point, MassTransit will do the right thing and bind the proper exchanges for you.

Upvotes: 5

Related Questions