Reputation: 11387
I've setup the following test:
Expectation:
Run
traces
| where message == "Event received"
| summarize count() by bin(timestamp,1s), cloud_RoleInstance
| render timechart
(this is a 3x run for 10k events each, to eliminate the "pod not warmed up variable")
Note that there's no (or very little) overlap between the pods activity, as if one of them is holding a lock or something, and mysteriously, at some point, the lock is released and used by the other pod.
Relevant Consumer code:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor = new EventProcessorClient(_storageClient, _consumerGroup, _hubConnection, _eventHubName);
_processor.ProcessEventAsync += ProcessEventHandler;
_processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await _processor.StartProcessingAsync(stoppingToken);
}
internal async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
_logger.LogTelemetry("Event received");
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
Upvotes: 0
Views: 1839
Reputation: 11387
There is actually nothing wrong with the code above. On this GitHub issue we discussed a bit and was able to notice the expected behavior when dealing with larger batches (500k events).
Upvotes: 1
Reputation: 7745
As described, your high-level scenario is already setup to consume in parallel.
Each EventProcessorClient
works independently, though they coordinate via storage to split ownership of partitions between them. In this case, each processor should own 5 partitions which they will claim over ~60-90 seconds with the default configuration, after which time ownership should be stable.
For each partition owned by the processor, an independent background task works to read events from Event Hubs and dispatch them to your handler. Your handler will be invoked concurrently, though it guarantees a single active invocation for a given partition.
The results that you're seeing would indicate that something is amiss, though there's limited context to speculate as to why. A couple of observations/questions from the snippet:
ExecuteAsync
will exit immediately once the processor starts; if something else is not blocking to keep the host process alive, it may be terminating.
_logger
will be called concurrently by different threads.
ProcessEventHandler
is not accounting for exceptions; if it throws, the task responsible for processing the partition will fault. Depending on your host environment, it may be restarted or the host process may crash. We strongly recommend following the guidance for processor handlers.
Adding a checkpoint for each event is supported, but will have a negative impact on throughput. For most scenarios we recommend checkpointing either after X number of events or some fixed time interval has passed, with those values being dictated by the number of events your application is comfortable reprocessing.
I'm happy to help you dig into what may be causing the clustering behavior that you're seeing, but Stack Overflow probably isn't the best venue for doing so. You may wish to file an issue in the Azure SDK for .NET repository where we can work through things.
Upvotes: 2