Lubu
Lubu

Reputation: 123

Event Hub Checkpoint Data is Not Saved

I am running the event hub receiver implementation from: https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send#create-a-python-script-to-receive-events and I haven't changed anything except the connection strings etc.

After creating hundreds of events, I can see that the receiver created checkpoint folder inside the storage account, but as I run the receiver again I see that it processes the same events.

The files that are created per partitions also are empty.

The storage is provided in the consumerClient:

checkpoint_store = BlobCheckpointStore.from_connection_string("...", "eventhubcontainer")


client = EventHubConsumerClient.from_connection_string("...", consumer_group="$Default", eventhub_name="eventhub1", checkpoint_store=checkpoint_store)

Also after reading the event there is method saving the checkpoint:

await partition_context.update_checkpoint(event)

Am I missing something here?


The whole code:

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

async def on_event(partition_context, event):
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
    client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
    async with client:
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Upvotes: 1

Views: 1254

Answers (2)

Ivan Glasenberg
Ivan Glasenberg

Reputation: 30035

This might be a bug in the old sdk.

Please try to install the latest version sdk: azure-eventhub 5.3.1 and azure-eventhub-checkpointstoreblob-aio 1.1.3.

I tested your code with these latest sdk, it works fine.

Upvotes: 2

Lubu
Lubu

Reputation: 123

Updating the SDK solved the problem. Also @AdamLing clarified the metadata location for me in the comment.

Upvotes: 0

Related Questions