dernat71
dernat71

Reputation: 467

Azure Eventhub Python SDK : batch of everything in time window from all partitions

I'm currently trying to implement a batch reading from an Azure EventHub created with 32 partitions. More precisely, I'm trying to read a batch that includes all the events received from the last 60 minutes.

The problem is that, while using the consumer.receive_batch() method, it triggers a call to the on_event() callback method for batches within the same partition. For example, 6 events in partition 6 trigger a call while 7 events in partition 9 trigger another call. I'd like to have all the events from every partition triggering one call of on_event() callback method. I'm currently using something like that to put the EventHubConsumerClient to listening mode :


self.consumer = EventHubConsumerClient.from_connection_string(
            conn_str=conn_str,
            consumer_group=consumer_group,
            eventhub_name=eventhub_name
        )

with self.consumer as consumer:
     consumer.receive_batch(
         on_event_batch=on_event_callback_method,
         starting_position=timestamp_60_minutes_ago,
         starting_position_inclusive=True,
         max_batch_size=999999999999,
         max_wait_time=60 # to let time to the receiver to gather all the messages
         )

I'm currently thinking about N-threads (here N being 32) reading partitions from 1-32 then reducing in one unified list of events but not really sure how to proceed or even if I'm not diving into the rabbit hole. Would be happy to have your view on that ! Our current implementation relies on Databricks support for EventHub (which seems to be able to do it) but we'd like to take a bit of distance from it and use official SDKs.

Upvotes: 0

Views: 1023

Answers (2)

dernat71
dernat71

Reputation: 467

For posterity (and thanks to @Kibrantn's help), I finally ended up using the Threading/Queue pattern a bit like follows. This will spin up one thread per partition available, receive in concurrency, aggregate in the thread-safe Queue, do that for N seconds before close() the receiver and, finally, aggregate everything in a single list :

from queue import Queue
import threading

from azure.eventhub import EventHubConsumerClient


class ReceiverClass:

    def _callback_process_data(self, partition_context, events):
        """
        Generic method used as a processing callback for all the events batches
        captured. This follows the Transform => ML => Post downstream workflow.
        """
        # Aggregate data into the aggregation Queue
        self.events_aggregation_queue.put(events)

    def receive_data(self):

        # Initialize the consumer
        self.consumer = EventHubConsumerClient.from_connection_string(
                    conn_str=self.config.connection_string,
                    consumer_group=self.config.consumer_group,
                    eventhub_name=self.config.eventhub_name
                )

        # Initialize the aggregation queue to gather all the EventData together (the callback take care of that)
        self.events_aggregation_queue = Queue()

        # Create a reception thread for each partition
        for partition_id in self.consumer.get_partition_ids():
            worker = threading.Thread(target=self.consumer.receive_batch,
                kwargs={"on_event_batch": self._callback_process_data,
                "starting_position": self.config.data_window.receiving_from_time,
                "starting_position_inclusive": True,
                "partition_id": partition_id})
            worker.start()

        # Aggregating for N seconds before stopping
        time.sleep(self.config.aggregation_wait_time_in_seconds)

        # Aggregate the data from the async Queue
        events_data_nested = [self.events_aggregation_queue.get() for _ in range(self.events_aggregation_queue.qsize())]
        events_data = [event_data for sublist in events_data_nested for event_data in sublist]
        logger.info(f"Received a list of {len(events_data)} EventData...")

        return events_data

Upvotes: 1

Kibrantn
Kibrantn

Reputation: 609

What you've suggested isn't off in left field, but might be making more work for yourself than needed if you're willing to do the sort of aggregation you've described slightly differently.

Rather than spawn 32 consumers, for which you'd need to potentially manage failure modes, lifecycles, thread management and all that overhead, why not just do what you're describing in terms of reducing into a single list but via a single event hub consumer as you have now? queue.Queue() from import queue (here) should give you all the synchronization logic you need out-of-box to let you have a single worker handling the aggregation reading out of that funnel.

I'll be upfront that this doesn't offer a huge semantic difference from just doing your logic in the per-partition callbacks, since they'd be feeding in roughly the same order into your aggregation queue, but if your goal is to process in larger contiguous windows, or see the cross-partition spread in one logical read, the above will let you achieve that. (As would your proposal, for the record, but it would require more thread+client management, as opposed to just one producer and one consumer).

Don't hesitate to let me know if this doesn't address your question or requires more clarity; full disclosure I'm one of the maintainers for the event-hubs python SDK, and you can always feel free to reach out to us on our github by tossing us an issue as well.

Upvotes: 2

Related Questions