Brian Lee
Brian Lee

Reputation: 173

In new version azure eventhub, how to close consumer_client after timeout

I am receiving data through azure eventhub and trying to close if I am receiving no more data for like 10 or 15 seconds.

I tried to implement arguments auth_timeout and idel_timeout in _consumer_client, but neither worked.

I am also referring to this example.

There's "on_error" function that may function to close the client when there's no further message being consumed.

def on_event(partition_context, event):

    ## My Code##

    # Put your code here. to do some operations on the event.

    print("Received event from partition {}.".format(partition_context.partition_id))
    print("Last enqueued event properties from partition: {} is: {}.".format(
        partition_context.partition_id,t))    
def on_error(partition_context, error):
    # Put your code here. partition_context can be None in the on_error callback.
    if partition_context:
        print("An exception: {} occurred during receiving from Partition: {}.".format(
            partition_context.partition_id,
            error
        ))
    else:
        print("An exception: {} occurred during the load balance process.".format(error))
consumer_client = EventHubConsumerClient.from_connection_string(conn_str=CONNECTION_STR,consumer_group='forceconsummer',eventhub_name=EVENTHUB_NAME, idle_timeout = 30, auth_timeout = 10)
consumer_client.receive(on_event=on_event, partition_id = "2", track_last_enqueued_event_properties=False, on_error=on_error, starting_position="@latest")

How can I make it work to be closed automatically after a timeout?

Upvotes: 0

Views: 1336

Answers (3)

Adam Ling
Adam Ling

Reputation: 126

I think you could utilize the on_event or on_event_batch callback to achieve the goal.

There is a max_wait_time key-word argument that you could pass to the receive or receive_batch method. You could use None/Empty array as an indicator to track how long you haven't received a message.

    def receive_batch(self, on_event_batch, **kwargs):
        # other parameters
        :keyword float max_wait_time: The maximum interval in seconds that the event processor will wait before
         calling the callback.
         If no events are received within this interval, the `on_event_batch` callback will be called
         with an empty list.
        # ...

     def receive(self, on_event, **kwargs):
        # other parameters
        :keyword float max_wait_time: The maximum interval in seconds that the event processor will wait before calling
         the callback. If no events are received within this interval, the `on_event` callback will be called with
         `None`.
         If this value is set to `None` or 0 (the default), the callback will not be called until an event is received.
        # ...

I tried this pattern and it worked for me:

import os
import time
import functools

from azure.eventhub import EventHubConsumerClient

last_receive_none_time = None
duration_to_close_after_not_receiving_events = 15


def on_event(consumer_client, partition_context, event):
    global last_receive_none_time
    global duration_to_close_after_not_receiving_events
    if not event:
        print('not receiving event in partition {}'.format(partition_context.partition_id))
        if not last_receive_none_time:
            last_receive_none_time = time.time()
        else:
            cur_time = time.time()
            not_receiving_event_duration = cur_time - last_receive_none_time
            if not_receiving_event_duration > duration_to_close_after_not_receiving_events:
                consumer_client.close()
    else:
        print('event received in partition {}'.format(partition_context.partition_id))
        last_receive_none_time = None  # reset the timer


if __name__ == '__main__':
    consumer_client = EventHubConsumerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        consumer_group='$Default',
        eventhub_name=EVENTHUB_NAME,
    )

    try:
        with consumer_client:
            on_event_callback_with_consumer_client = functools.partial(on_event, consumer_client)
            consumer_client.receive(
                on_event=on_event_callback_with_consumer_client,
                starting_position="-1",  # "-1" is from the beginning of the partition.
                max_wait_time=5,
            )
    except KeyboardInterrupt:
        print('Stopped receiving.')

Let me know if this gives you some insights or we could discuss this further.

Upvotes: 1

CSA
CSA

Reputation: 43

wont let me comment, but I had the same issue. The builin azure idle timeout seems to do nothing.

Serkant's example worked for me. Just replace (RECEIVE_DURATION) with an int... i.e 15 seconds. oh and import threading

Upvotes: 0

Serkant Karaca
Serkant Karaca

Reputation: 2034

.receive is a blocking call. Try calling it in a separate thread so you can close the consumer in the main thread. You can craft below snippet to track last received time and take close decision if it has been a while since the last event received.

   thread = threading.Thread(
        target=consumer_client.receive,
        kwargs={
            "on_event": on_event,
            "on_partition_initialize": on_partition_initialize,
            "on_partition_close": on_partition_close,
            "on_error": on_error,
            "starting_position": "-1",  # "-1" is from the beginning of the partition.
        }
    )

    thread.daemon = True
    thread.start()
    time.sleep(RECEIVE_DURATION)
    consumer_client.close()
    thread.join()

Upvotes: 1

Related Questions