Akhmed
Akhmed

Reputation: 1219

MassTransit Consumer do not consume messages in Worker

I have a service publishing messages to the "ta-emails" queue (SQS) And I have a running Worker, which must consume these messages.

public class AdvisorRegisteredConsumer: IConsumer<AdvisorRegistered>
{
    private readonly ILogger<AdvisorRegisteredConsumer> _logger;

    public AdvisorRegisteredConsumer(
        ILogger<AdvisorRegisteredConsumer> logger
        )
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<AdvisorRegistered> context)
    {
        _logger.LogInformation("Consuming advisor id: {AdvisorId}", context.Message.AdvisorId);
    }
}

My MassTransit configuration in Program.cs of the Worker

        services.AddMassTransit(x =>
        {
            x.UsingAmazonSqs((context, cfg) =>
            {
                AmazonSQSConfig _sqsConfig = new AmazonSQSConfig
                {
                    ServiceURL = configuration.GetValue<string>("AWS:SQS:Endpoint"),
                    AuthenticationRegion = configuration.GetValue<string>("AWS:Region")
                };

                cfg.Host(configuration.GetValue<string>("AWS:Region"), h =>
                {
                    h.Config(_sqsConfig);
                    h.AccessKey(configuration.GetValue<string>("AWS:SQS:AccessKey"));
                    h.SecretKey(configuration.GetValue<string>("AWS:SQS:SecretAccessKey"));
                });
                
                cfg.ReceiveEndpoint("ta-emails", e =>
                {
                    e.ConfigureConsumer<AdvisorRegisteredConsumer>(context);
                });
            });
        });

Here is the appsettings.json config node

 "AWS": {
        "Region": "ru-central1",
        "SQS": {
          "Endpoint": "https://message-queue.api.cloud.yandex.net",
          "AccessKey": "REPLACED_MY_KEY_WITH_THIS",
          "SecretAccessKey": "REPLACED_MY_SECRET_KEY_WITH_THIS"
        }
      },

Worker itself

public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;

    public Worker(ILogger<Worker> logger)
    {
        _logger = logger;
    }
    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            _logger.LogInformation("Worker running at: {time}", DateTimeOffset.UtcNow);

            await Task.Delay(30000, cancellationToken);
        }
    }
}

Both publishing service and consuming worker have identical (copy/paste) MassTransit service configuration in DI.

However, message successfuly published by service and never dispatched by consumer in running worker.

I assume, that I've forgot to add something related to consumer configuration or MassTransit configuration generally.

Upvotes: 1

Views: 707

Answers (1)

AndyG
AndyG

Reputation: 41220

The first thing you want to do is make absolutely sure that both your publisher (or sender if you use IBus.Send instead of IBus.Publish) are operating on exactly the same type, namespace included. This is how MassTransit matches up messages to consumers.

With Amazon Simple Queue Service (SQS), MassTransit configures an Amazon Simple Notification Service (SNS) topic that publishes to your queues. If you/your company decide to manually create/configure the SNS topic and SQS queue(s) instead of granting permission for your application/MassTransit to do so, then you should make sure you configure your topic for raw message delivery. Otherwise the messages received by your queue are wrapped in an SNS envelope that MassTransit doesn't know what to do with.

If you create publish, receive, and consume observers like so:

public class MyMassTransitObserver : IReceiveObserver, IPublishObserver, IConsumeObserver
{
 // ...
 public Task PreReceive(ReceiveContext context)
 {
    _logger.LogInformation("PreReceive: {msg}", context.Body.GetString());
 }
 // do simple logging for all the other interface methods
}

...
services.AddMassTransit(configurator =>
{
    configurator.AddPublishObserver<MyMassTransitObserver>();
    configurator.AddReceiveObserver<MyMassTransitObserver>();
    configurator.AddConsumeObserver<MyMassTransitObserver>();
    // ...
}

Then you should see the following pattern get logged to your console when you publish an event

  • PrePublish
  • PostPublish
  • PreReceive
  • PreConsume
  • (Your consumer runs)
  • PostConsume
  • PostReceive

Logging the actual message in PreReceive is useful because it will show you whether the messageType is what you expect, and if your message is accidentally wrapped in an SNS envelope.

If your message is misconfigured, then PreConsume...PostConsume will not run.

If you don't see PreReceive...PostReceive, then messages may not be getting delivered to your SQS queue at all, so you will want to ensure that it is subscribed to the correct topic.

If you don't see PrePublish...PostPublish, then your logic to call IBus.Publish is not getting called, so you may want to step through your code with a debugger.

Upvotes: 0

Related Questions