Dan Cook
Dan Cook

Reputation: 2072

Mass Transit - Produce messages in Web App and consume in Azure Function

I have a Web App which sends messages over the Azure Service Bus and awaits a reply:

var response = await this.createClient.GetResponse<Models.Response<CommandResponse>>(message, cancellationToken);

I have an Azure Function with a Service Bus Trigger that consumes the messages:

        [FunctionName("CommandHandler")]
        public Task HandleCommandAsync(
            [ServiceBusTrigger("input-queue", Connection = "AzureWebJobsServiceBus"), ServiceBusAccount("ServiceBus")]
            Message message,
            IBinder binder,
            ILogger logger,
            CancellationToken cancellationToken)
        {
            logger.LogInformation("Command Handler Function Invoked.");

            var result = System.Text.Encoding.UTF8.GetString(message.Body);
            var d = JsonConvert.DeserializeObject<dynamic>(result);
            string messageType = d.message.messageType.Value;

            var handler = Bus.Factory.CreateBrokeredMessageReceiver(
                binder,
                cfg =>
                    {
                        cfg.CancellationToken = cancellationToken;
                        cfg.SetLog(logger);
                        cfg.InputAddress = new Uri($"{this.secrets.Value.ServiceBusUri}/input-queue");
                        cfg.UseRetry(x => x.Intervals(10, 100, 500, 1000));

                            cfg.Consumer(() => this.customerConsumer);
                    });

            var handlerResult = handler.Handle(message);
            logger.LogInformation("Command Handler Function Completed.");
            return handlerResult;
        }

In order to send messages from the web app I must configure the consumers. I'm not sure why this is necessary, as the Web App should never need to directly reference the consumers, but without the following code, no messages get sent.

            services.AddMassTransit(
                x =>
                {
                    x.AddConsumer<CustomerConsumer>();

                    x.AddBus(
                            provider => Bus.Factory.CreateUsingAzureServiceBus(
                                cfg =>
                                    {
                                        var host = cfg.Host(secrets.AzureWebJobsServiceBus, h => { });

                                        cfg.ReceiveEndpoint(
                                            host,
                                            "input-queue",
                                            ep =>
                                                {
                                                    ep.ConfigureConsumer<CustomerConsumer>(provider);

                                                    ep.PrefetchCount = 16;
                                                    ep.UseMessageRetry(r => r.Interval(2, 100));
                                                });
                                    }));

                    x.AddRequestClient<RegisterNewCustomerCommand>();
                });

The problem is after the message is sent, sometimes the CommandHandler function app triggers and invokes the Handle message of the consumer.

But sometimes the Web App inadvertently invokes the consumer directly (because the consumer is registered in startup and listening for messages on the queue).

This is undesired for me. For scalability only the Azure Function should invoke the consumers. Further, the consumers have dependencies injected that are only registered in the WebHostStartUp of the function app so consumers error out if invoked directly from the Web App.

Question: How can I break the dependency between the Web App and the consumers so that they never get invoked directly by the Web App but always by the function app trigger? Is there any way I can avoid Adding/Configuring the consumers in Web App's StartUp method whilst still enabling the Web App to send the message via GetResponse?

UPDATE - SOLUTION

The ASP.NET Core 2.2 Startup does not need to have any reference to the Consumers (unless unlike me you want your Web App to also consume messages).

You need to add the service bus URL including path to the queue to your AddRequestClient. See accepted answer.

This worked for me.


            services.AddMassTransit(
                x =>
                    {
                        x.AddBus(
                            provider => Bus.Factory.CreateUsingAzureServiceBus(
                                cfg =>
                                    {
                                        cfg.Host(
                                            secrets.AzureWebJobsServiceBus,
                                            h => { h.TransportType = TransportType.Amqp; });
                                    }));

                        var serviceBusUri = new Uri($"{settings.ServiceBusUri}/input-queue");
                        x.AddRequestClient<RegisterNewCustomerCommand>
                    });
        }

Upvotes: 0

Views: 1066

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33278

You shouldn't need to have the consumer on your client-side API. The only thing I can think is that you didn't have the topology setup right.

  1. Make sure the queue exists, in advance, since functions don't create the queues.
  2. Configure the URI address for the queue, so that your web API can specify it on the AddRequestClient call as an argument (_sb://host..../input-queue).
  3. Profit!

Because of how you configured the request client, it's likely that it was using Publish instead of Send, since it didn't know the queue address. In that scenario, adding the consumer to your web API did the proper subscription on the topic to forward the message to the queue. You can use something like Service Bus Explorer to see how it all gets laid out in the cloud.

Adding the URI to the client app for the request client should solve the issue, and send commands directly to the queue.

Upvotes: 1

Related Questions