jthack
jthack

Reputation: 3

Why does Azure Eventhub python library throw KeyError: 'all-partitions' when it reaches the maximum size?

We are upgrading some scripts that use the python libraries for Azure Event hub to the latest version (5.0). I'm mostly following the example in the docs titled Publish events to an Event Hub. I thought it was interesting when I first read the code because it relies on hitting a ValueError Exception. Didn't seem like the best design. But anyways, I went with it. I'll put that example code here to limit tab-switching for readers:

# THIS IS THE EXAMPLE CODE FROM MICROSOFT
event_data_batch = client.create_batch()
can_add = True
while can_add:
    try:
        event_data_batch.add(EventData('Message inside EventBatchData'))
    except ValueError:
        can_add = False  # EventDataBatch object reaches max_size.

with client:
    client.send_batch(event_data_batch)

So, we query different apis and then ship that data off to Eventhub so I already had a For loop, looping over the events and shipping them 1 at a time. Our hope was that batching would make it much fast and more efficient. Here's how I integrated the example into our for loop:

# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)


if "eventhub" in self.output_config.keys():
    if self.output_config['eventhub'] is True:
        if events:
            i = 0
            event_data_batch = self.output_client.create_batch()
            for event in events:
                try:
                    event_data_batch.add(EventData(json.dumps(event)))
                except ValueError:  # EventDataBatch object reaches max_size.
                    # Ship events
                    with self.output_client:
                        self.output_client.send_batch(event_data_batch)
                    # Set up the next batch
                    event_data_batch = self.output_client.create_batch()
                except Exception as e:
                    self.output_error = True
                    self.logger.error("Error shipping event to EventHub: {}".format(e))
                    i += 1

        if not self.output_error:
            if events:
                with self.output_client:
                    self.output_client.send_batch(event_data_batch)
            self.logger.info("Sent %d events" % (len(events)))
        else:
            self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))

Notice how we ship the events in the if not self.output_error block because sometimes we might not hit the max size ValueError that the example has. Anyways, when testing this, if we DONT hit the limit, everything works, but if we hit the max size, we get this error (which we have not been able to solve yet):

2020-03-02 12:59:43,697 - DEBUG - o365-dev - Period is 30
2020-03-02 12:59:43,699 - DEBUG - o365-dev - Output handling 1952 events.

Traceback (most recent call last):
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 230, in output
    event_data_batch.add(EventData(json.dumps(event)))
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_common.py", line 364, in add
    self.max_size_in_bytes
ValueError: EventDataBatch has reached its size limit: 1046528

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 216, in send_batch
    cast(EventHubProducer, self._producers[partition_id]).send(
KeyError: 'all-partitions'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "cloud-connector.py", line 175, in <module>
    main()
  File "cloud-connector.py", line 171, in main
    cloud.setup_connections()
  File "cloud-connector.py", line 135, in setup_connections
    self.connections[conn['name']] = self.modules[conn['module']].Module(conn['name'], self.config['output'], loglevel=self.logger.getEffectiveLevel())
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 89, in __init__
    self.run()
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 173, in run
    self.output(events)
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 234, in output
    self.output_client.send_batch(event_data_batch)
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 220, in send_batch
    self._start_producer(partition_id, send_timeout)
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 126, in _start_producer
    not self._producers[partition_id]
KeyError: 'all-partitions'

Upvotes: 0

Views: 1219

Answers (1)

CormorantX
CormorantX

Reputation: 106

@jthack, "with self.output_client:" closes the output_client after the code block finishes. You used it twice so the second time you're trying to use a closed client, the client is in a wrong state. I suggest you put the code into one with statement.

# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)

with self.output_client:
    if "eventhub" in self.output_config.keys():
        if self.output_config['eventhub'] is True:
            if events:
                i = 0
                event_data_batch = self.output_client.create_batch()
                for event in events:
                    try:
                        event_data_batch.add(EventData(json.dumps(event)))
                    except ValueError:  # EventDataBatch object reaches max_size.
                        # Ship events
                        self.output_client.send_batch(event_data_batch)
                        # Set up the next batch
                        event_data_batch = self.output_client.create_batch()
                    except Exception as e:
                        self.output_error = True
                        self.logger.error("Error shipping event to EventHub: {}".format(e))
                        i += 1

            if not self.output_error:
                if events:
                    self.output_client.send_batch(event_data_batch)
                self.logger.info("Sent %d events" % (len(events)))
            else:
                self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))

Upvotes: 2

Related Questions