Reputation: 2234
I want to receive latest message from eventhub, since there are too many message send to eventhub before. I set my own consume group, but it seems only can receive message from early to now.
Is there any method can skip the message to latest?
Upvotes: 1
Views: 2431
Reputation: 29940
If you know checkpoint in azure eventhubs, this should be easy to achieve.
In short, checkpoint is a file stored in azure blob storage, each time you read data from eventhub, the offset is recorded in the checkpoint. And when the next time, you use the same checkpoint to read data from the eventhubs, it will start reading from the offset.
So if you want to skip reading old data, you can first create a test receiver project like this and set the checkpoint. Then next time, in your production, you can use the same checkpoint, it will always skip the old data because of the offset.
Another way is that, you can use EventProcessorOptions
with RegisterEventProcessorAsync
method. When you choose to use this method, you need to manually remove the checkpoint from azure blob storage, or this setting will be overridden by checkpoint.
The sample like below, in your receiver method:
private static async Task MainAsync(string[] args)
{
Console.WriteLine("Registering EventProcessor...");
var eventProcessorHost = new EventProcessorHost(
EventHubName,
PartitionReceiver.DefaultConsumerGroupName,
EventHubConnectionString,
StorageConnectionString,
StorageContainerName);
//here, you can get only the data sent in event hub in the recent an hour.
var options = new EventProcessorOptions
{
InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow.AddHours(-1))
};
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options);
Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine();
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}
Upvotes: 2