Leonardo
Leonardo

Reputation: 11387

Azure event hub - How to consume events parallelly using the official SDK?

I've setup the following test:

Expectation:
Run

traces
| where message == "Event received"
| summarize count() by bin(timestamp,1s), cloud_RoleInstance
| render timechart 

and see something like:
enter image description here

but instead im seeing this:
enter image description here

(this is a 3x run for 10k events each, to eliminate the "pod not warmed up variable")

Note that there's no (or very little) overlap between the pods activity, as if one of them is holding a lock or something, and mysteriously, at some point, the lock is released and used by the other pod.

Relevant Consumer code:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    _processor = new EventProcessorClient(_storageClient, _consumerGroup, _hubConnection, _eventHubName);
    _processor.ProcessEventAsync += ProcessEventHandler;
    _processor.ProcessErrorAsync += ProcessErrorHandler;

    // Start the processing

    await _processor.StartProcessingAsync(stoppingToken);
}

internal async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
    _logger.LogTelemetry("Event received");

    await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}

Upvotes: 0

Views: 1839

Answers (2)

Leonardo
Leonardo

Reputation: 11387

There is actually nothing wrong with the code above. On this GitHub issue we discussed a bit and was able to notice the expected behavior when dealing with larger batches (500k events).

Here's a screenshot:
enter image description here

Upvotes: 1

Jesse Squire
Jesse Squire

Reputation: 7745

As described, your high-level scenario is already setup to consume in parallel.

Each EventProcessorClient works independently, though they coordinate via storage to split ownership of partitions between them. In this case, each processor should own 5 partitions which they will claim over ~60-90 seconds with the default configuration, after which time ownership should be stable.

For each partition owned by the processor, an independent background task works to read events from Event Hubs and dispatch them to your handler. Your handler will be invoked concurrently, though it guarantees a single active invocation for a given partition.

The results that you're seeing would indicate that something is amiss, though there's limited context to speculate as to why. A couple of observations/questions from the snippet:

  • ExecuteAsync will exit immediately once the processor starts; if something else is not blocking to keep the host process alive, it may be terminating.

  • _logger will be called concurrently by different threads.

  • ProcessEventHandler is not accounting for exceptions; if it throws, the task responsible for processing the partition will fault. Depending on your host environment, it may be restarted or the host process may crash. We strongly recommend following the guidance for processor handlers.

  • Adding a checkpoint for each event is supported, but will have a negative impact on throughput. For most scenarios we recommend checkpointing either after X number of events or some fixed time interval has passed, with those values being dictated by the number of events your application is comfortable reprocessing.

I'm happy to help you dig into what may be causing the clustering behavior that you're seeing, but Stack Overflow probably isn't the best venue for doing so. You may wish to file an issue in the Azure SDK for .NET repository where we can work through things.

Upvotes: 2

Related Questions