Reputation: 107
Context
I have the local Azure EventHub Emulator running. I set it up with docker as shown here: https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator?source=recommendations&tabs=docker-linux-container
I am using an implementation of Azure SDK's PluggableCheckpointStoreEventProcessor
(which itself implements EventProcessor
) to read from the EventHub. Link for the abstract class: https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.primitives.pluggablecheckpointstoreeventprocessor-1?view=azure-dotnet
internal class BasicConsumer : PluggableCheckpointStoreEventProcessor<EventProcessorPartition>
{
public BasicConsumer(
BlobContainerClient storageClient,
string connectionString,
string consumerGroupName,
string eventHubName,
int batchSize)
: base(
checkpointStore: new BlobCheckpointStore(storageClient),
eventBatchMaximumCount: batchSize,
consumerGroup: consumerGroupName,
connectionString: connectionString,
eventHubName: eventHubName
)
{
}
protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken)
{
Trace.WriteLine("Processing error");
return Task.CompletedTask;
}
protected override Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, EventProcessorPartition partition, CancellationToken cancellationToken)
{
Trace.WriteLine("Processing batch");
return Task.CompletedTask;
}
protected override async Task OnInitializingPartitionAsync(EventProcessorPartition partition, CancellationToken cancellationToken)
{
Trace.WriteLine("Initializing partition" + partition.PartitionId);
await base.OnInitializingPartitionAsync(partition, cancellationToken).ConfigureAwait(false);
}
}
I am using the recommended Azurite connection string, EventHub Emulator connection string, and setting up BasicConsumer
like so:
var blobConnection = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;";
var eventHubConnection = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;";
var storageClient = new BlobContainerClient(blobConnection, "localhost-test");
var basicConsumer = new BasicConsumer(storageClient, eventHubConnection, consumerGroupName, eventHubName, batchSize: 16);
Problem
Unfortunately, I do not see any sign of the BasicConsumer class reading from the local EventHub. It should be the same as any other EventHub. I don't see the Azurite based storageClient being created or any traces from the overridden event handlers.
Previously, when using EventProcessor from legacy Azure SDK, I saw that these event handlers were actually logging/tracing when a partition opened, just not anymore since upgrading.
My current test scenario is starting the EventHub Emulator (Docker-based, which also runs Azurite), also running Azurite separately, and
simply instantiating a BasicConsumer()
. I also do a wait to see if the BasicConsumer()
begins reading after some time, but to no avail.
How can I read from the local EventHub?
Exceptions found when Debugging
{"Service request failed.\r\nStatus: 404 (The specified container does not exist.)
\r\nErrorCode: ContainerNotFound\r\n\r\nHeaders:\r\nServer: Azurite-Blob/3.31.0
\r\nx-ms-error-code: ContainerNotFound\r\nx-ms-request-id: 2a0f9015-01ff-43e5-b616-6b221da347f2\r
\nDate: Wed, 31 Jul 2024 06:42:43 GMT\r\nConnection: keep-alive\r\nKeep-Alive: REDACTED\r\n"}
{"The messaging entity 'sb://emulatorns1.eventhubs.emulator.net/testeventhub' could not be found.
To know more visit https://aka.ms/sbResourceMgrExceptions. (TestEventHub). For troubleshooting information,
see https://aka.ms/azsdk/net/eventhubs/exceptions/troubleshoot"}
From the above exceptions I saw when debugging, it looks like:
Upvotes: 0
Views: 739
Reputation: 107
1. Call StartProcessingAsync()
After debugging for a while, I learned a couple things that were unclear.
Simply constructing your implementation of the PluggableCheckpointStoreEventProcessor
does not start any event ingestion process.
You must call StartProcessingAsync()
on the consumer to actually begin processing events.
2. Use provided Config.json and Namespace name
I used the Config.json
file shown in the tutorial (https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator?source=recommendations&tabs=docker-linux-container)
but simply renamed the file and environment variable. Renaming the config file to a custom name led to errors when running docker compose.
When using a custom file name, I only saw the following start in the CMD window:
However, upon using the Out-Of-Box Config.json
, a Network entity was also started - I've had runs where it didn't start as well:
Additionally, you must use the namespace name emulatorns1
- a custom Namespace name is simply overridden by the emulator.
3. Use a separate Azurite Emulator
When using an Azure SDK provided Event Processor, you must use a BlobContainerClient to actually checkpoint offsets when processing Events. Although the EventHubs Emulator starts up an instance of Azurite, the standard Azurite connection string won't be able to access this resource.
Please start a separate instance of Azurite separately and be sure to use .CreateIfNotExists()
on the BlobContainerClient when setting up your test environnment.
Upvotes: 1