user19291301
user19291301

Reputation: 143

Masstransit: messages are send to _skipped queue

I am trying to send event from one service to another, but consumer receives the event and resends it to the _skipped queue. I could not find the solution.

The data model properties are same. One of the properties is Enum type. Consumer and Producer are separate projects.

Consumer Config:


  services.AddMassTransit(busConfig =>
        {
            busConfig.SetKebabCaseEndpointNameFormatter();
            busConfig.AddConsumer<ShooterGameEventsListener>();
            
            busConfig.UsingRabbitMq((context, cfg) =>
            {
                IOptions<MessageQueueOptions> messageQueueOptions = context.GetRequiredService<IOptions<MessageQueueOptions>>();
                IOptions<QueuesOptions> queueOptions = context.GetRequiredService<IOptions<QueuesOptions>>();

                cfg.UseRawJsonDeserializer();
                cfg.UseRawJsonSerializer();

                cfg.Host(messageQueueOptions.Value.Uri, "/", rabbitConfig =>
                {
                    rabbitConfig.Username(messageQueueOptions.Value.Username!);
                    rabbitConfig.Password(messageQueueOptions.Value.Password!);
                });
                
                cfg.ReceiveEndpoint(queueOptions.Value.ShooterGameQueue!, configEndpoint =>
                {
                    configEndpoint.ConcurrentMessageLimit = 10;

                    configEndpoint.Bind(queueOptions.Value.ShooterGameExchange);

                    configEndpoint.UseRawJsonDeserializer();
                    configEndpoint.ConfigureConsumer<ShooterGameEventsListener>(context);
                });
            });
        });

Producer Config:

  serviceCollection.AddMassTransit(busConfig =>
        {
            busConfig.SetKebabCaseEndpointNameFormatter();
            busConfig.AddConsumer<TransactionListener>();
            
            busConfig.UsingRabbitMq((context, cfg) =>
            {
                IOptions<MessageQueueOptions> messageQueueOptions = context.GetRequiredService<IOptions<MessageQueueOptions>>();
                IOptions<QueuesOptions> queueOptions = context.GetRequiredService<IOptions<QueuesOptions>>();

                cfg.UseRawJsonDeserializer();
                cfg.UseRawJsonSerializer();

                cfg.Host(messageQueueOptions.Value.Uri, "/", rabbitConfig =>
                {
                    rabbitConfig.Username(messageQueueOptions.Value.Username!);
                    rabbitConfig.Password(messageQueueOptions.Value.Password!);
                });

                #region ShooterGame

                cfg.Message<ShooterGameEvent>(topologyConfig =>
                    topologyConfig.SetEntityName(queueOptions.Value.ShooterGameExchange!));

                cfg.Publish<ShooterGameEvent>(publishConfig =>
                {
                    publishConfig.Durable = true;
                    publishConfig.AutoDelete = false;
                    publishConfig.ExchangeType = "fanout";
                });

                #endregion

                #region Transactions

                cfg.ReceiveEndpoint(queueOptions.Value.TransactionsQueue!, configEndpoint =>
                {
                    configEndpoint.ConfigureConsumeTopology = false;
                    configEndpoint.ConcurrentMessageLimit = 10;

                    configEndpoint.Bind(queueOptions.Value.TransactionsExchange);

                    configEndpoint.UseRawJsonDeserializer();
                    configEndpoint.ConfigureConsumer<TransactionListener>(context);
                });

                #endregion
            });
        });

The options are correct.

Upvotes: 0

Views: 12

Answers (0)

Related Questions