Azure Event Grid subscription to Console Application

I want to Subscribe for Azure Event Grid in C# Console Application, actually I'm implementing the example of EventBus from eShopContainer project, and I need to make a subscription for a topic and listen the message, process and print the message sent before for another C# Console Application that implement EventBus. So, ¿How can I do that with a C# Console Application?

This is my azure portal where the message are stored in a queue storage:

azure portal subscriptions

This is the queue where the all message are:

all messages

So, I need to subscribe and get all messages!

Upvotes: 2

Views: 1905

Answers (1)

Roman Kiss
Roman Kiss

Reputation: 8235

Basically there are three ways for using a console subscriber in the Azure Event Grid model. The following picture shows them:

enter image description here

Note, that the hybrid connection and ngrok tunnel are used in my Azure Event Grid Tester. Have a look at their implementation.

The following code snippet is an example of the using a HybridConnectionListener in the console app:

using Microsoft.Azure.Relay;
using Newtonsoft.Json.Linq;
using System;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string connectionString = ConfigurationManager.AppSettings["HybridConnection"];
            HybridConnectionListener listener = null;

            try
            {
                listener = new HybridConnectionListener(connectionString);
                listener.Connecting += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.White;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Connecting, listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Online += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Online, listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Offline += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Blue;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Offline, listener:{listener.Address}");
                    Console.ResetColor();
                };

                listener.RequestHandler = (context) =>
                {
                    try
                    {
                        if (!context.Request.Headers.AllKeys.Contains("Aeg-Event-Type", StringComparer.OrdinalIgnoreCase) || !string.Equals(context.Request.Headers["Aeg-Event-Type"], "Notification", StringComparison.CurrentCultureIgnoreCase))
                            throw new Exception("Received message is not for EventGrid subscriber");

                        string jsontext = null;
                        using (var reader = new StreamReader(context.Request.InputStream))
                        {
                            var jtoken = JToken.Parse(reader.ReadToEnd());
                            if (jtoken is JArray)
                                jsontext = jtoken.SingleOrDefault<JToken>().ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JObject)
                                jsontext = jtoken.ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JValue)
                                throw new Exception($"The payload (JValue) is not accepted. JValue={jtoken.ToString(Newtonsoft.Json.Formatting.None)}");
                        }

                        Console.ForegroundColor = ConsoleColor.DarkYellow;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Headers: {string.Join(" | ", context.Request.Headers.AllKeys.Where(i => i.StartsWith("aeg-") || i.StartsWith("Content-Type")).Select(i => $"{i}={context.Request.Headers[i]}"))}");
                        Console.ForegroundColor = ConsoleColor.Yellow;
                        Console.WriteLine($"{jsontext}");
                                             
                    }
                    catch (Exception ex)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Message processing failed - {ex.Message}");
                    }
                    finally
                    {
                        context.Response.StatusCode = HttpStatusCode.NoContent;
                        context.Response.Close();
                        Console.ResetColor();
                    }
                };
                await listener.OpenAsync(TimeSpan.FromSeconds(60));
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                Console.ResetColor();
            }

            Console.ReadLine();

            if(listener != null)
                await listener.CloseAsync();
        }
    }
}

Using the Hybrid Connection in the AEG subscription for event handler destination, all events will be delivered to the console app like is shown in the following screen snippet:

enter image description here

UPDATE:

The following example shows an implementation of the subscriber with an output binding to the signalR service. In this scenario we will need to build the two HttpTrigger functions one for subscriber and the other one for signalR Client to obtain an url and access token for the specific userId:

enter image description here

  1. HttpTriggerGetSignalRinfo function:

run.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"

using System;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req, SignalRConnectionInfo connectionInfo, ILogger log)
{
    log.LogInformation($"Info.Url={connectionInfo.Url}");

    return new OkObjectResult(new 
    { 
        url = connectionInfo.Url, 
        accessToken = connectionInfo.AccessToken,
    }); 
}

function.json:

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "get"
      ]
    },
    {
      "type": "signalRConnectionInfo",
      "name": "connectionInfo",
      "hubName": "%AzureSignalRHubName%",
      "connectionStringSetting": "AzureSignalRConnectionString",
      "userId": "{query.userid}",
      "direction": "in"
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ]
}
  1. signalR client - console app:

     using Microsoft.AspNetCore.SignalR.Client;
     using Newtonsoft.Json;
     using Newtonsoft.Json.Linq;
     using System;
     using System.Configuration;
     using System.Net.Http;
     using System.Threading.Tasks;
    
     namespace ConsoleApp4
     {
         class Program
         {
             static async Task Main(string[] args)
             {
                 HubConnection connection = null;
                 string userId = ConfigurationManager.AppSettings.Get("userId");
                 string signalRInfo = ConfigurationManager.AppSettings.Get("signalRInfo");
    
                 try
                 {
                     using (var client = new HttpClient())
                     {
                         var rsp = await client.GetAsync($"{signalRInfo}&userid={userId}");
                         string jsontext = await rsp.Content.ReadAsStringAsync();
                         var info = JsonConvert.DeserializeAnonymousType(jsontext, new { url = "", accessToken = "" });
    
                         connection = new HubConnectionBuilder()
                             .WithUrl(info.url, option =>
                             {
                             option.AccessTokenProvider = () =>
                                 {
                                     return Task.FromResult(info.accessToken);
                                 };
                             }).Build();
    
                         Console.ForegroundColor = ConsoleColor.Green;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] SignalR Client on {info.url}/users/{userId}");
                         Console.ResetColor();
                     }
    
                     connection.On<string, string>("SendMessage", (string headers, string message) =>
                     {
                         Console.ForegroundColor = ConsoleColor.DarkYellow;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] {headers}");
                         Console.ForegroundColor = ConsoleColor.Yellow;
                         Console.WriteLine($"{JToken.Parse(message).ToString(Formatting.Indented)}");
                         Console.ResetColor();
                     });
    
                     await connection.StartAsync();              
                 }
                 catch (Exception ex)
                 {
                     Console.ForegroundColor = ConsoleColor.Red;
                     Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                     Console.ResetColor();
                 }
                 Console.ReadLine();
                 if (connection != null)
                     await connection.StopAsync();
             }       
         }
     }
    
  2. HttpTriggerSendMsgToSignalR function - subscriber

run.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
#r "Newtonsoft.Json"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req, IAsyncCollector<SignalRMessage> signalRMessages, ILogger log)
{   
    string headers = string.Join(" | ", req.Headers.Where(h => h.Key.StartsWith("aeg-") || h.Key.StartsWith("Content-Type")).Select(i => $"{i.Key}={i.Value.First()}")); 
    log.LogInformation($"Method: {req.Method} Headers: {headers}");    
          
    if (req.Method == HttpMethod.Options.ToString())
    {
        log.LogInformation("CloudEventSchema validation");               
        req.HttpContext.Response.Headers.Add("Webhook-Allowed-Origin", req.Headers["WebHook-Request-Origin"].FirstOrDefault()?.Trim());
        return (ActionResult)new OkResult();
    }
    
    var jtoken = JToken.Parse(await new StreamReader(req.Body).ReadToEndAsync());
    string eventTypeHeader = req.Headers["aeg-event-type"].FirstOrDefault()?.Trim(); 

    if(eventTypeHeader == "SubscriptionValidation") 
    {       
        if(jtoken is JArray)
            jtoken = jtoken.SingleOrDefault<JToken>();

        if(jtoken["eventType"].Value<string>() == "Microsoft.EventGrid.SubscriptionValidationEvent")
        {
            log.LogInformation("EventGridSchema validation");
            return (ActionResult)new OkObjectResult(new { validationResponse = ((dynamic)jtoken["data"]["validationCode"])});         
        }           
        return new BadRequestObjectResult($"Not valid event schema");
    }   
    else if(eventTypeHeader == "Notification") 
    {          
        await signalRMessages.AddAsync(
            new SignalRMessage
            {
                // the message will only be sent to these user IDs or if this property not exit, the bindig path will be used it
                Target = "SendMessage",
                Arguments = new[] { headers, jtoken.ToString() }
            });        
        return (ActionResult)new OkResult();  
    }
     
    return new BadRequestObjectResult($"{eventTypeHeader} is not a valid type");
}

function.json:

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "options",
        "post"
      ]
    },
    {
      "type": "signalR",
      "name": "signalRMessages",
      "hubName": "%AzureSignalRHubName%/users/{query.userid}",
      "connectionStringSetting": "AzureSignalRConnectionString",
      "direction": "out"
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ]
}

Note, that the webhook event handler is used for subscriber for two reasons such as delivery a CloudEvent messages and configuring the signalR client userId via the url query string paramater.

  1. Displaying events for userid=abcd on the Console app:

enter image description here

Note, that the signalR client instances allows to multicast messages for the same userid in opposite to the Hybrid Connection where the messages are balancing between the listener instances.

Upvotes: 3

Related Questions