vishnu
vishnu

Reputation: 223

Getting Data from EventHub is delayed

I have an EventHub configured in Azure, also a consumer group for reading the data. It was working fine for some days. Suddenly, I see there is a delay in incoming data(around 3 days). I use Windows Service to consume data in my server. I have around 500 incoming messages per minute. Can anyone help me out to figure this out ?

Upvotes: 0

Views: 2520

Answers (1)

Peter Bons
Peter Bons

Reputation: 29780

It might be that you are processing them items too slow. Therefore the work to be done grows and you will lag behind.

To get some insight in where you are in the event stream you can use code like this:

private void LogProgressRecord(PartitionContext context)
{
    if (namespaceManager == null)
        return;

    var currentSeqNo = context.Lease.SequenceNumber;
    var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber;
    var delta = lastSeqNo - currentSeqNo;

    logWriter.Write(
            $"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})",
            EventLevel.Informational);
}

the namespaceManager is build like this:

namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz");

I call this logging method in the CloseAsync method:

public Task CloseAsync(PartitionContext context, CloseReason reason)
{
    LogProgressRecord(context);

    return Task.CompletedTask;
}

logWriter is just some logging class I have used to write info to blob storage.

It now outputs messages like

Last processed seqnr for partition 3: 32780931 of 32823804 in consumergroup 'telemetry' (lag: 42873)

so when the lag is very high you could be processing events that have occurred a long time ago. In that case you need to scale up/out your processor.

If you notice a lag you should measure how long it takes to process a given number of item. You can then try to optimize performance and see whether it improves. We did it like:

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
        try
        {
            stopwatch.Restart();

            // process items here

            stopwatch.Stop();

            await CheckPointAsync(context);

            logWriter.Write(
                $"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.",
                EventLevel.Informational);
        }
}

Upvotes: 1

Related Questions