Reputation: 855
Hi I am using Event hub for ingesting data at the frequency of 1 second.
I am continuously pushing simulated data from console application to event hub and then storing into the SQL data base.
Now its been more than 5 days and I found every day some times my receiver process data two times that why i got duplicate records into the database.
Since it happen only once or twice in a day so I am not even able to trace.
Can any one faced such situation so far ? Or is it possible then host can process same messages twice ? Or is it an issue of async behavior of receiver ?
Looking forward for the help....
Code snippet :
public class SimpleEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
// store data into SQL database / database call.
}
// Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(0))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
if (messages.Count() > 0)
await context.CheckpointAsync();
}
}
Upvotes: 0
Views: 486
Reputation: 29870
Event Hub guarantees at least once delivery:
It has the following characteristics:
- low latency
- capable of receiving and processing millions of events per second
- at least once delivery
So you can expect this to happen.
Also take in account the situation that checkpointing just has occurred, then some more message (lets call them A and B) are processed and then the process fails. The next time the reading process is started again after the failure message consumption will start at the last checkpointed message, so in other words, message A and B will be processed again.
Upvotes: 0