Steffen Mangold
Steffen Mangold

Reputation: 1144

EventHubProcessor leases are stolen between hosts over and over

Problem

I'm using Microsoft.Azure.EventHubs.Processor to consume an EventHubs with 32 partitions in parallel by up to 4 hosts running in different service instances.

While all 4 services are up and running they begin to steal the partitions leases over and over. This ends up in events that are processed again and again.

event flow

Setup

My IEventProcessor currently looks this way:

public class BaseEventProcessor : IEventProcessor
{
    readonly TimeSpan DefaultCheckpointInterval = TimeSpan.FromMinutes(1);
    readonly TimeSpan _checkpointInterval;
    Stopwatch _checkpointStopWatch;

    public BaseEventProcessor(TimeSpan? checkpointInterval = null)
    {
        _checkpointInterval = checkpointInterval ?? DefaultCheckpointInterval;
    }

    public virtual Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
        // some log code

        return Task.CompletedTask;
    }

    public virtual async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        // some log code

        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    public virtual Task OpenAsync(PartitionContext context)
    {
        // some log code

        _checkpointStopWatch = new Stopwatch();
        _checkpointStopWatch.Start();

        return Task.CompletedTask;
    }

    public virtual async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        // some processing code

        if (messages.Count() > 0 && _checkpointStopWatch.Elapsed >= _checkpointInterval)
        {
            await context.CheckpointAsync();
            _checkpointStopWatch.Restart();
        }
    }
}

Processor is init with this options:

new EventProcessorOptions
{                            
    PrefetchCount = 200,
    MaxBatchSize = 100,
    InitialOffsetProvider = (partitionId) => EventPosition.FromEnd(),
    InvokeProcessorAfterReceiveTimeout = true,
    ReceiveTimeout = TimeSpan.FromSeconds(30),
    EnableReceiverRuntimeMetric = true
}

And partition manager with this options:

new PartitionManagerOptions
{
    RenewInterval = TimeSpan.FromSeconds(10),
    LeaseDuration = TimeSpan.FromSeconds(60)                            
}

Output

This errors are thrown inside ProcessError method:

The lease ID specified did not match the lease ID for the blob.

-

Receiver 'dadf82a9-d27a-4af6-b482-5158c23bebe0' with a higher epoch '14' already exists. Receiver '65bc9d06-c09b-4ab5-af62-75e05ecaa88a' with epoch 11 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected

-

New receiver 'ed4fbbcd-5896-40d2-adc9-55feb77f6564' with higher epoch of '12' is created hence current receiver '65bc9d06-c09b-4ab5-af62-75e05ecaa88a' with epoch '11' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.

Question

What I'm doing wrong here? Do I need to fine-tune the options or do I need to react somehow to the error?


Answer to @Serkant Karaca

Thanks, here is a response to your questions:

  1. Actually, I see errors of code 409 and 412. Thanks for pointing out.

  2. (see 1)

  3. This should be fine. It's constantly monitored.

  4. I don't get this. There are other consumers with in the same consumer group. These one should share the partitions. But they have different HostName unique names.

  5. Oh ok, that I don't know. The storage account is used by multiple diffent EventHubs and Consumer groups (all with different names. storage account Can You give more details on why a single storage account is needed?

  6. I will disable.

Upvotes: 1

Views: 679

Answers (1)

Serkant Karaca
Serkant Karaca

Reputation: 2032

Couple things to check:

  1. Make sure to log errors from both IProcessorHost error handler and EventProcessorOptions error handler.

  2. See if there are any storage exceptions that clients are logging. This is important to identify if lease-lost is due to storage I/O failures or not.

  3. Check client side resource utilization like CPU, thread deprivation, available memory etc. High resource utilization can delay task scheduling and thus can cause I/O timeouts.

  4. Make sure no other receivers are consuming from the same consumer group. Please note that there can be one epoch receiver per partition at a time. Each host group should receive from a dedicated consumer group. Receivers may observe ReceiverDisconnectedExceptions if 2 host groups start competing for the same consumer group.

  5. Dedicate single storage account per consumer group. Basically don't share storage account with any other service or consumer. This is recommend due to consideration of storage throttling. If some other service cases high storage I/O and thus throttling, this may also impact lease operations.

  6. Make sure soft-delete is disabled for storage account.

Upvotes: 1

Related Questions