Brian Lee
Brian Lee

Reputation: 173

Is there any way to close Eventhubconsumerclient.receive if it has been a while without any messages?

I just implemented eventhub with version 5, which has been changed a bit from previous version.

current running code is following:

consumer_client = EventHubConsumerClient.from_connection_string(conn_str=CONNECTION_STR,
                                                  consumer_group='fconsumer',
                                                  eventhub_name=EVENTHUB_NAME)
consumer_client.receive(on_event=on_event, 
                        partition_id = "0", 
                        track_last_enqueued_event_properties=False, 
                        starting_position="@latest")

By adding an argument for the time duration (or keep_alive ag from prev version), I would make it stop receiving messages and close it after a certain amount of time. Is this possible?

Upvotes: 1

Views: 738

Answers (1)

Serkant Karaca
Serkant Karaca

Reputation: 2034

consumer_client.receive(...) will be a blocking call and it won't return on its own. You need to create a thread for consuming events and in the main thread you can get to decide when to close the consumer client. Sample code snippet as below...

thread = threading.Thread(
    target=consumer_client.receive,
    kwargs={
        "on_event": on_event,
        "on_partition_initialize": on_partition_initialize,
        "on_partition_close": on_partition_close,
        "on_error": on_error,
        "starting_position": "-1",  # "-1" is from the beginning of the partition.
    }
)

thread.daemon = True
thread.start()
time.sleep(RECEIVE_DURATION)
consumer_client.close()
thread.join()

Upvotes: 2

Related Questions