CESCO
CESCO

Reputation: 7758

Receive Event Grid from a Service Bus Handler on Azure Services

I am sending the event on the event grid like. i can see they arriving at the azure dashboard

            NomeEmailChange yay = new NomeEmailChange
            {
                Nome = "cesco",
                Email = "cesco"
            };

            var primaryTopicKey = _config["EventGridConfig:AcessKey"];
            var primaryTopic = _config["EventGridConfig:Endpoint"];

            var primaryTopicHostname = new Uri(primaryTopic).Host;

            var topicCredentials = new TopicCredentials(primaryTopicKey);
            var client = new EventGridClient(topicCredentials);

            var id = Guid.NewGuid().ToString();
            var hey = new List<EventGridEvent>
            {
                new EventGridEvent()
                {
                    Id = id,
                    EventType = "cesco-cesco",
                    Data = (yay),
                    EventTime = DateTime.Now,
                    Subject = "MS_Clientes",
                    DataVersion = "1.0",
                }
            };
            ;
            client.PublishEventsAsync(primaryTopicHostname, hey);

then I created a event grid subscrition. i can confirm on the event grid messages arriving to the event grid subscription.

On another project I am subscribing to the service bus like the bellow. it works fine for consuming messages sent directly to the bus.

        public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
            IHostingEnvironment env)
        {
            services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
            services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
            {
                var keyName = "RootManageSharedAccessKey";
                var busName = configuration["ServiceBus:Name"];
                var secret = configuration["ServiceBus:Secret"];
                var host = cfg.Host(
                    "Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
                    "SharedAccessKeyName=" + keyName + ";" +
                    "SharedAccessKey=" + secret,
                    z =>
                    {
                        TokenProvider
                            .CreateSharedAccessSignatureTokenProvider(keyName, secret);
                    });
                cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
                cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
                    e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
            }));
            services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
            services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
            services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
            services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
            services.AddSingleton<IHostedService, BusService>();
            return services;
        }

all supposed to work by now, but on this other project when the message arrives I get the following error

fail: MassTransit.Messages[0]
      R-FAULT sb://sbacompanharreldev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
   at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

Upvotes: 3

Views: 1161

Answers (2)

kgalic
kgalic

Reputation: 2654

As suggested you have to write the custom deserializer. In my implementation, I changed a couple of things on how to deal with the MassTransit package.

First, you have to register deserializers for different Content Types:

  1. For example, if your message come from MassTransit publisher it will be the default message ContentType(in C#): JsonMessageSerializer.JsonContentType
  2. If your message comes from the ServiceBus queue/topic, it is usually "application/json"

The following code shows how to register deserializers, and how to set up the receiving endpoint. One difference compared to your approach is that I used connection string, so no need for SAS token part.

class Program
    {
        static string ContentTypeJson = "application/json";
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
            {
                var queueName = "Your SB Queue Name";
                var connectionString = "Connection String with RooTManage policy";
                var host = cfg.Host(connectionString, h =>
                {
                    h.OperationTimeout = TimeSpan.FromSeconds(60);
                });

                cfg.ReceiveEndpoint(queueName,
                    e =>
                    {
                        e.AddMessageDeserializer(contentType: new ContentType(ContentTypeJson), () =>
                        {
                            return new EventGridMessgeDeserializer(ContentTypeJson);
                        });
                        e.Consumer(() => new EventGridMessageConsumer());

                        // Uncomment if required deserializer for local messages - mass transit as publisher or direct messages from SB
                        //e.AddMessageDeserializer(contentType: JsonMessageSerializer.JsonContentType, () =>
                        //{
                        //    return new CustomMessageDeserializer(JsonMessageSerializer.JsonContentType.ToString());
                        //});
                        //e.Consumer(() => new MessageConsumer());
                    });

            });
            bus.Start();

            Console.WriteLine("Press any key to exit");
            // for testing purposes of local messages - mass transit as publisher
            // await bus.Publish<CustomMessage>(new {  Hello = "Hello, World." });
            await Task.Run(() => Console.ReadKey());

            await bus.StopAsync();

        }

I used my message simulator to pipe the messages to the EventGrid, which forwards the messages to the SB queue, and below you can see the outcome of running this code:

enter image description here

Deserializer for event grid messages, as well as full code, you can find on Github: https://github.com/kgalic/MassTransitSample and also in my answer on your other question.

Upvotes: 5

Sean Feldman
Sean Feldman

Reputation: 25994

MassTransit is using message envelope that is used to deserialize the content. You will need to create and register a custom serializer to allow MassTransit ingest and process your messages. For more information, see Interoperability documentation. Once a custom serializer is created, it can be registered using configuration API.

Upvotes: 0

Related Questions