Reputation: 173
I am receiving data through azure eventhub and trying to close if I am receiving no more data for like 10 or 15 seconds.
I tried to implement arguments auth_timeout
and idel_timeout
in _consumer_client, but neither worked.
I am also referring to this example.
There's "on_error" function that may function to close the client when there's no further message being consumed.
def on_event(partition_context, event):
## My Code##
# Put your code here. to do some operations on the event.
print("Received event from partition {}.".format(partition_context.partition_id))
print("Last enqueued event properties from partition: {} is: {}.".format(
partition_context.partition_id,t))
def on_error(partition_context, error):
# Put your code here. partition_context can be None in the on_error callback.
if partition_context:
print("An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id,
error
))
else:
print("An exception: {} occurred during the load balance process.".format(error))
consumer_client = EventHubConsumerClient.from_connection_string(conn_str=CONNECTION_STR,consumer_group='forceconsummer',eventhub_name=EVENTHUB_NAME, idle_timeout = 30, auth_timeout = 10)
consumer_client.receive(on_event=on_event, partition_id = "2", track_last_enqueued_event_properties=False, on_error=on_error, starting_position="@latest")
How can I make it work to be closed automatically after a timeout?
Upvotes: 0
Views: 1336
Reputation: 126
I think you could utilize the on_event
or on_event_batch
callback to achieve the goal.
There is a max_wait_time
key-word argument that you could pass to the receive
or receive_batch
method. You could use None/Empty array as an indicator to track how long you haven't received a message.
def receive_batch(self, on_event_batch, **kwargs):
# other parameters
:keyword float max_wait_time: The maximum interval in seconds that the event processor will wait before
calling the callback.
If no events are received within this interval, the `on_event_batch` callback will be called
with an empty list.
# ...
def receive(self, on_event, **kwargs):
# other parameters
:keyword float max_wait_time: The maximum interval in seconds that the event processor will wait before calling
the callback. If no events are received within this interval, the `on_event` callback will be called with
`None`.
If this value is set to `None` or 0 (the default), the callback will not be called until an event is received.
# ...
I tried this pattern and it worked for me:
import os
import time
import functools
from azure.eventhub import EventHubConsumerClient
last_receive_none_time = None
duration_to_close_after_not_receiving_events = 15
def on_event(consumer_client, partition_context, event):
global last_receive_none_time
global duration_to_close_after_not_receiving_events
if not event:
print('not receiving event in partition {}'.format(partition_context.partition_id))
if not last_receive_none_time:
last_receive_none_time = time.time()
else:
cur_time = time.time()
not_receiving_event_duration = cur_time - last_receive_none_time
if not_receiving_event_duration > duration_to_close_after_not_receiving_events:
consumer_client.close()
else:
print('event received in partition {}'.format(partition_context.partition_id))
last_receive_none_time = None # reset the timer
if __name__ == '__main__':
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
)
try:
with consumer_client:
on_event_callback_with_consumer_client = functools.partial(on_event, consumer_client)
consumer_client.receive(
on_event=on_event_callback_with_consumer_client,
starting_position="-1", # "-1" is from the beginning of the partition.
max_wait_time=5,
)
except KeyboardInterrupt:
print('Stopped receiving.')
Let me know if this gives you some insights or we could discuss this further.
Upvotes: 1
Reputation: 43
wont let me comment, but I had the same issue. The builin azure idle timeout seems to do nothing.
Serkant's example worked for me. Just replace (RECEIVE_DURATION) with an int... i.e 15 seconds. oh and import threading
Upvotes: 0
Reputation: 2034
.receive is a blocking call. Try calling it in a separate thread so you can close the consumer in the main thread. You can craft below snippet to track last received time and take close decision if it has been a while since the last event received.
thread = threading.Thread(
target=consumer_client.receive,
kwargs={
"on_event": on_event,
"on_partition_initialize": on_partition_initialize,
"on_partition_close": on_partition_close,
"on_error": on_error,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
thread.daemon = True
thread.start()
time.sleep(RECEIVE_DURATION)
consumer_client.close()
thread.join()
Upvotes: 1