Reputation: 148
Can I use SharedAccessKey to connect to the broker (EventHubs)?
I'm unable to connect to my Azure EventHubs.
We use SharedAccessKey instead of SSL to get connected and I have this configuration to do it.
"EventBusConfig": {
"BootstrapServers": "anyname.servicebus.windows.net:9093",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "Plain",
"SaslUsername": "$ConnectionString",
"SaslPassword":
"Endpoint=sb://anyname.servicebus.windows.net/;SharedAccessKeyName=anyname.;SharedAccessKey=CtDbJ/Kfjs749
8s--anypassword--SkSk749/z2Z5Fr9///33/qQ+R6Cyg=",
"SocketTimeoutMs": "60000",
"SessionTimeoutMs": "30000",
"GroupId": "NameOfTheGroup",
"AutoOffsetReset": "Earliest",
"BrokerVersionFallback": "1.0.0",
"Debug": "cgrp"
}
But it seems I need the certification path (the pem file)
I want to produce a simple message like this
I'm using https://github.com/Azure/azure-functions-kafka-extension but I don't know if this beta library can handle SharedAccessKey.
I got this error when trying to connect:
Any help will be appreciated
Upvotes: 1
Views: 1543
Reputation: 148
I was able to produce and consume messages using the extension "https://github.com/Azure/azure-functions-kafka-extension".
To consume a message was easy because of the property "EventHubConnectionString" very intuitive.
To produce a message you need to configure a CA certificate, I thought that I need this from Azure but I was wrong and I just followed these instructions to make it work.
Download and set the CA certification location. As described in Confluent documentation, the .NET library does not have the capability to access root CA certificates. Missing this step will cause your function to raise the error "sasl_ssl://xyz-xyzxzy.westeurope.azure.confluent.cloud:9092/bootstrap: Failed to verify broker certificate: unable to get local issuer certificate (after 135ms in state CONNECT)"
To overcome this, we need to:
- Download CA certificate (i.e. from https://curl.haxx.se/ca/cacert.pem).
- Rename the certificate file to anything other than cacert.pem to avoid any conflict with existing EventHubs Kafka certificate that is part of the extension.
- Include the file in the project, setting "copy to output directory"
- Set the SslCaLocation trigger attribute property. In the example, we set to confluent_cloud_cacert.pem
This is my producer Azure function with Kafka binding
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
namespace EY.Disruptor.AzureFunctionsWithKafka
{
public static class Function
{
[FunctionName("Producer")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[Kafka("BootstrapServer",
"topic.event",
Username = "ConfluentCloudUsername",
Password = "ConfluentCloudPassword",
SslCaLocation = "confluent_cloud_cacert.pem",
AuthenticationMode = BrokerAuthenticationMode.Plain,
Protocol = BrokerProtocol.SaslSsl
)] IAsyncCollector<KafkaEventData<string>> events,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string name = req.Query["name"];
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic data = JsonConvert.DeserializeObject(requestBody);
name ??= data?.name;
string responseMessage = string.IsNullOrEmpty(name)
? "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response."
: $"Hello, {name}. This HTTP triggered function executed successfully.";
var kafkaEvent = new KafkaEventData<string>()
{
Value = name
};
await events.AddAsync(kafkaEvent);
return new OkObjectResult(responseMessage);
}
}
}
This is my consume Azure function with Kafka binding
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace EY.Disruptor.AzureFunctionsWithKafka
{
public static class Consumer
{
[FunctionName("FunctionKafkaConsumer")]
public static void Run(
[KafkaTrigger("BootstrapServer",
"topic.event",
Username = "ConfluentCloudUsername",
Password = "ConfluentCloudPassword",
EventHubConnectionString = "ConfluentCloudPassword",
AuthenticationMode = BrokerAuthenticationMode.Plain,
Protocol = BrokerProtocol.SaslSsl,
ConsumerGroup = "Group1")] KafkaEventData<string>[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
logger.LogInformation(kafkaEvent.Value);
}
}
}
}
This is my local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"BootstrapServer": "zyxabc.servicebus.windows.net:9093",
"ConfluentCloudUsername": "$ConnectionString",
"ConfluentCloudPassword": "Endpoint=sb://zyxabc.servicebus.windows.net/;SharedAccessKeyName=TestSvc;SharedAccessKey=YAr/="
}
}
And of course the initialization in the Startup.cs
public void Configure(IWebJobsBuilder builder)
{
builder.AddKafka();
}
I hope this recommendation helps other people :)
Upvotes: 1