C Connell
C Connell

Reputation: 61

Getting no msgs when reading Azure Event Hub with Python using EventHubClient

I have an Azure Event Hub that has messages in it. I wrote the msgs with a Python app and can see the correct msg count in the Event Hub GUI. But I cannot seem to read the msg with Python. My code is below. It runs with no errors but gets zero results.

Oddly, after I run this code, the Event Hub GUI shows that all the msgs (a couple thousand) were outgoing, indicating that my program actually got them. But the code never displays them.

Any help appreciated!

The result is always...

Msg offset: <azure.eventhub.common.Offset object at 0x102fc4e10>
Msg seq: 0
Msg body: 0

Received 1 messages in 0.11292386054992676 seconds

++++++++++++

# pip install azure-eventhub

import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset

logger = logging.getLogger("azure")

# URL of the event hub, amqps://<mynamespace>.servicebus.windows.net/myeventhub
ADDRESS = "amqps://chc-eh-ns.servicebus.windows.net/chc-eh"

# Access tokens for event hub namespace, from Azure portal for namespace
USER = "RootManageSharedAccessKey"
KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXX"

# Additional setup to receive events
CONSUMER_GROUP = "$default"   # our view of the event hub, useful when there is more than one consumer at same time
PARTITION = "0"   # which stream within event hub
OFFSET = Offset("-1")  # get all msgs in event hub. msgs are never removed, they just expire per event hub settings
PREFETCH = 100   # not sure exactly what this does ??

# Initialize variables
total = 0
last_sn = -1
last_offset = -1

client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=PREFETCH, offset=OFFSET)
    client.run()
    start_time = time.time()
    for event_data in receiver.receive(timeout=100):
        last_offset = event_data.offset
        last_sn = event_data.sequence_number
        print("Msg offset: " + str(last_offset))
        print("Msg seq: " + str(last_sn))
        print("Msg body: " + event_data.body_as_str())
        total += 1

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("\nReceived {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass

finally:
    client.stop()

Upvotes: 1

Views: 1191

Answers (1)

C Connell
C Connell

Reputation: 61

Got it! The code sample I copied was wrong. You have to get batches of messages then iterate over each batch. Here is the correct code...

# pip install azure-eventhub

import time
from azure.eventhub import EventHubClient, Offset

# URL of the event hub, amqps://<mynamespace>.servicebus.windows.net/myeventhub
ADDRESS = "amqps://chc-eh-ns.servicebus.windows.net/chc-eh"

# Access tokens for event hub namespace, from Azure portal for namespace
USER = "RootManageSharedAccessKey"
KEY = "XXXXXXXXXXXX"

# Additional setup to receive events
CONSUMER_GROUP = "$default"   # our view of the event hub, useful when there is more than one consumer at same time
PARTITION = "0"   # which stream within event hub
OFFSET = Offset("-1")  # get all msgs in event hub. msgs are never removed, they just expire per event hub settings
PREFETCH = 100   # batch size ??

# Initialize variables
total = 0
last_sn = -1
last_offset = -1

client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=PREFETCH, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=5000)
    while batch:
        for event_data in batch:
            last_offset = event_data.offset
            last_sn = event_data.sequence_number
            print("Msg offset: " + str(last_offset))
            print("Msg seq: " + str(last_sn))
            print("Msg body: " + event_data.body_as_str())
            total += 1
        batch = receiver.receive(timeout=5000)
    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("\nReceived {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass

finally:
    client.stop()

Upvotes: 1

Related Questions