Reputation: 143
I am trying to send event from one service to another, but consumer receives the event and resends it to the _skipped queue. I could not find the solution.
The data model properties are same. One of the properties is Enum type. Consumer and Producer are separate projects.
Consumer Config:
services.AddMassTransit(busConfig =>
{
busConfig.SetKebabCaseEndpointNameFormatter();
busConfig.AddConsumer<ShooterGameEventsListener>();
busConfig.UsingRabbitMq((context, cfg) =>
{
IOptions<MessageQueueOptions> messageQueueOptions = context.GetRequiredService<IOptions<MessageQueueOptions>>();
IOptions<QueuesOptions> queueOptions = context.GetRequiredService<IOptions<QueuesOptions>>();
cfg.UseRawJsonDeserializer();
cfg.UseRawJsonSerializer();
cfg.Host(messageQueueOptions.Value.Uri, "/", rabbitConfig =>
{
rabbitConfig.Username(messageQueueOptions.Value.Username!);
rabbitConfig.Password(messageQueueOptions.Value.Password!);
});
cfg.ReceiveEndpoint(queueOptions.Value.ShooterGameQueue!, configEndpoint =>
{
configEndpoint.ConcurrentMessageLimit = 10;
configEndpoint.Bind(queueOptions.Value.ShooterGameExchange);
configEndpoint.UseRawJsonDeserializer();
configEndpoint.ConfigureConsumer<ShooterGameEventsListener>(context);
});
});
});
Producer Config:
serviceCollection.AddMassTransit(busConfig =>
{
busConfig.SetKebabCaseEndpointNameFormatter();
busConfig.AddConsumer<TransactionListener>();
busConfig.UsingRabbitMq((context, cfg) =>
{
IOptions<MessageQueueOptions> messageQueueOptions = context.GetRequiredService<IOptions<MessageQueueOptions>>();
IOptions<QueuesOptions> queueOptions = context.GetRequiredService<IOptions<QueuesOptions>>();
cfg.UseRawJsonDeserializer();
cfg.UseRawJsonSerializer();
cfg.Host(messageQueueOptions.Value.Uri, "/", rabbitConfig =>
{
rabbitConfig.Username(messageQueueOptions.Value.Username!);
rabbitConfig.Password(messageQueueOptions.Value.Password!);
});
#region ShooterGame
cfg.Message<ShooterGameEvent>(topologyConfig =>
topologyConfig.SetEntityName(queueOptions.Value.ShooterGameExchange!));
cfg.Publish<ShooterGameEvent>(publishConfig =>
{
publishConfig.Durable = true;
publishConfig.AutoDelete = false;
publishConfig.ExchangeType = "fanout";
});
#endregion
#region Transactions
cfg.ReceiveEndpoint(queueOptions.Value.TransactionsQueue!, configEndpoint =>
{
configEndpoint.ConfigureConsumeTopology = false;
configEndpoint.ConcurrentMessageLimit = 10;
configEndpoint.Bind(queueOptions.Value.TransactionsExchange);
configEndpoint.UseRawJsonDeserializer();
configEndpoint.ConfigureConsumer<TransactionListener>(context);
});
#endregion
});
});
The options are correct.
Upvotes: 0
Views: 12