Reputation: 1144
I'm using Microsoft.Azure.EventHubs.Processor
to consume an EventHubs with 32 partitions in parallel by up to 4 hosts running in different service instances.
While all 4 services are up and running they begin to steal the partitions leases over and over. This ends up in events that are processed again and again.
My IEventProcessor
currently looks this way:
public class BaseEventProcessor : IEventProcessor
{
readonly TimeSpan DefaultCheckpointInterval = TimeSpan.FromMinutes(1);
readonly TimeSpan _checkpointInterval;
Stopwatch _checkpointStopWatch;
public BaseEventProcessor(TimeSpan? checkpointInterval = null)
{
_checkpointInterval = checkpointInterval ?? DefaultCheckpointInterval;
}
public virtual Task ProcessErrorAsync(PartitionContext context, Exception error)
{
// some log code
return Task.CompletedTask;
}
public virtual async Task CloseAsync(PartitionContext context, CloseReason reason)
{
// some log code
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
public virtual Task OpenAsync(PartitionContext context)
{
// some log code
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();
return Task.CompletedTask;
}
public virtual async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
// some processing code
if (messages.Count() > 0 && _checkpointStopWatch.Elapsed >= _checkpointInterval)
{
await context.CheckpointAsync();
_checkpointStopWatch.Restart();
}
}
}
Processor is init with this options:
new EventProcessorOptions
{
PrefetchCount = 200,
MaxBatchSize = 100,
InitialOffsetProvider = (partitionId) => EventPosition.FromEnd(),
InvokeProcessorAfterReceiveTimeout = true,
ReceiveTimeout = TimeSpan.FromSeconds(30),
EnableReceiverRuntimeMetric = true
}
And partition manager with this options:
new PartitionManagerOptions
{
RenewInterval = TimeSpan.FromSeconds(10),
LeaseDuration = TimeSpan.FromSeconds(60)
}
This errors are thrown inside ProcessError
method:
The lease ID specified did not match the lease ID for the blob.
-
Receiver 'dadf82a9-d27a-4af6-b482-5158c23bebe0' with a higher epoch '14' already exists. Receiver '65bc9d06-c09b-4ab5-af62-75e05ecaa88a' with epoch 11 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected
-
New receiver 'ed4fbbcd-5896-40d2-adc9-55feb77f6564' with higher epoch of '12' is created hence current receiver '65bc9d06-c09b-4ab5-af62-75e05ecaa88a' with epoch '11' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.
What I'm doing wrong here? Do I need to fine-tune the options or do I need to react somehow to the error?
Thanks, here is a response to your questions:
Actually, I see errors of code 409 and 412. Thanks for pointing out.
(see 1)
This should be fine. It's constantly monitored.
I don't get this. There are other consumers with in the same consumer group. These one should share the partitions. But they have different HostName
unique names.
Oh ok, that I don't know. The storage account is used by multiple diffent EventHubs and Consumer groups (all with different names.
Can You give more details on why a single storage account is needed?
I will disable.
Upvotes: 1
Views: 679
Reputation: 2032
Couple things to check:
Make sure to log errors from both IProcessorHost error handler and EventProcessorOptions error handler.
See if there are any storage exceptions that clients are logging. This is important to identify if lease-lost is due to storage I/O failures or not.
Check client side resource utilization like CPU, thread deprivation, available memory etc. High resource utilization can delay task scheduling and thus can cause I/O timeouts.
Make sure no other receivers are consuming from the same consumer group. Please note that there can be one epoch receiver per partition at a time. Each host group should receive from a dedicated consumer group. Receivers may observe ReceiverDisconnectedExceptions if 2 host groups start competing for the same consumer group.
Dedicate single storage account per consumer group. Basically don't share storage account with any other service or consumer. This is recommend due to consideration of storage throttling. If some other service cases high storage I/O and thus throttling, this may also impact lease operations.
Make sure soft-delete is disabled for storage account.
Upvotes: 1