Azure Eventhub python库为什么在达到最大大小时会抛出KeyError:'all-partitions'?

问题描述 投票:0回答:1

我们正在将一些使用python libraries for Azure Event hub的脚本升级到最新版本(5.0)。我主要遵循文档标题为Publish events to an Event Hub的示例。第一次阅读代码时,我认为这很有趣,因为它依赖于命中ValueError Exception。似乎不是最好的设计。但是无论如何,我同意了。我将把示例代码放在此处,以限制读者进行标签切换:

# THIS IS THE EXAMPLE CODE FROM MICROSOFT
event_data_batch = client.create_batch()
can_add = True
while can_add:
    try:
        event_data_batch.add(EventData('Message inside EventBatchData'))
    except ValueError:
        can_add = False  # EventDataBatch object reaches max_size.

with client:
    client.send_batch(event_data_batch)

因此,我们查询不同的api,然后将该数据发送到Eventhub,因此我已经有了一个For循环,遍历事件并将它们一次发送1。我们希望批量处理能够使其更快,更高效。这是将示例集成到for循环中的方式:

# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)


if "eventhub" in self.output_config.keys():
    if self.output_config['eventhub'] is True:
        if events:
            i = 0
            event_data_batch = self.output_client.create_batch()
            for event in events:
                try:
                    event_data_batch.add(EventData(json.dumps(event)))
                except ValueError:  # EventDataBatch object reaches max_size.
                    # Ship events
                    with self.output_client:
                        self.output_client.send_batch(event_data_batch)
                    # Set up the next batch
                    event_data_batch = self.output_client.create_batch()
                except Exception as e:
                    self.output_error = True
                    self.logger.error("Error shipping event to EventHub: {}".format(e))
                    i += 1

        if not self.output_error:
            if events:
                with self.output_client:
                    self.output_client.send_batch(event_data_batch)
            self.logger.info("Sent %d events" % (len(events)))
        else:
            self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))

注意我们如何将事件传送到if not self.output_error块中,因为有时我们可能无法达到示例所具有的最大大小ValueError。无论如何,在测试时,如果我们没有达到极限,则一切正常,但是如果达到最大尺寸,则会出现此错误(我们尚无法解决):

2020-03-02 12:59:43,697 - DEBUG - o365-dev - Period is 30
2020-03-02 12:59:43,699 - DEBUG - o365-dev - Output handling 1952 events.

Traceback (most recent call last):
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 230, in output
    event_data_batch.add(EventData(json.dumps(event)))
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_common.py", line 364, in add
    self.max_size_in_bytes
ValueError: EventDataBatch has reached its size limit: 1046528

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 216, in send_batch
    cast(EventHubProducer, self._producers[partition_id]).send(
KeyError: 'all-partitions'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "cloud-connector.py", line 175, in <module>
    main()
  File "cloud-connector.py", line 171, in main
    cloud.setup_connections()
  File "cloud-connector.py", line 135, in setup_connections
    self.connections[conn['name']] = self.modules[conn['module']].Module(conn['name'], self.config['output'], loglevel=self.logger.getEffectiveLevel())
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 89, in __init__
    self.run()
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 173, in run
    self.output(events)
  File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 234, in output
    self.output_client.send_batch(event_data_batch)
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 220, in send_batch
    self._start_producer(partition_id, send_timeout)
  File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 126, in _start_producer
    not self._producers[partition_id]
KeyError: 'all-partitions'
python python-3.x azure azure-eventhub
1个回答
1
投票

@ jthack,“ with self.output_client:”在代码块完成后关闭output_client。您使用了两次,因此第二次尝试使用封闭的客户端时,客户端处于错误状态。我建议您将代码与声明一起放入。

# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)

with self.output_client:
    if "eventhub" in self.output_config.keys():
        if self.output_config['eventhub'] is True:
            if events:
                i = 0
                event_data_batch = self.output_client.create_batch()
                for event in events:
                    try:
                        event_data_batch.add(EventData(json.dumps(event)))
                    except ValueError:  # EventDataBatch object reaches max_size.
                        # Ship events
                        self.output_client.send_batch(event_data_batch)
                        # Set up the next batch
                        event_data_batch = self.output_client.create_batch()
                    except Exception as e:
                        self.output_error = True
                        self.logger.error("Error shipping event to EventHub: {}".format(e))
                        i += 1

            if not self.output_error:
                if events:
                    self.output_client.send_batch(event_data_batch)
                self.logger.info("Sent %d events" % (len(events)))
            else:
                self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))
© www.soinside.com 2019 - 2024. All rights reserved.