我们正在将一些使用python libraries for Azure Event hub的脚本升级到最新版本(5.0)。我主要遵循文档标题为Publish events to an Event Hub
的示例。第一次阅读代码时,我认为这很有趣,因为它依赖于命中ValueError Exception。似乎不是最好的设计。但是无论如何,我同意了。我将把示例代码放在此处,以限制读者进行标签切换:
# THIS IS THE EXAMPLE CODE FROM MICROSOFT
event_data_batch = client.create_batch()
can_add = True
while can_add:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
with client:
client.send_batch(event_data_batch)
因此,我们查询不同的api,然后将该数据发送到Eventhub,因此我已经有了一个For循环,遍历事件并将它们一次发送1。我们希望批量处理能够使其更快,更高效。这是将示例集成到for
循环中的方式:
# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)
if "eventhub" in self.output_config.keys():
if self.output_config['eventhub'] is True:
if events:
i = 0
event_data_batch = self.output_client.create_batch()
for event in events:
try:
event_data_batch.add(EventData(json.dumps(event)))
except ValueError: # EventDataBatch object reaches max_size.
# Ship events
with self.output_client:
self.output_client.send_batch(event_data_batch)
# Set up the next batch
event_data_batch = self.output_client.create_batch()
except Exception as e:
self.output_error = True
self.logger.error("Error shipping event to EventHub: {}".format(e))
i += 1
if not self.output_error:
if events:
with self.output_client:
self.output_client.send_batch(event_data_batch)
self.logger.info("Sent %d events" % (len(events)))
else:
self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))
注意我们如何将事件传送到if not self.output_error
块中,因为有时我们可能无法达到示例所具有的最大大小ValueError。无论如何,在测试时,如果我们没有达到极限,则一切正常,但是如果达到最大尺寸,则会出现此错误(我们尚无法解决):
2020-03-02 12:59:43,697 - DEBUG - o365-dev - Period is 30
2020-03-02 12:59:43,699 - DEBUG - o365-dev - Output handling 1952 events.
Traceback (most recent call last):
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 230, in output
event_data_batch.add(EventData(json.dumps(event)))
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_common.py", line 364, in add
self.max_size_in_bytes
ValueError: EventDataBatch has reached its size limit: 1046528
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 216, in send_batch
cast(EventHubProducer, self._producers[partition_id]).send(
KeyError: 'all-partitions'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "cloud-connector.py", line 175, in <module>
main()
File "cloud-connector.py", line 171, in main
cloud.setup_connections()
File "cloud-connector.py", line 135, in setup_connections
self.connections[conn['name']] = self.modules[conn['module']].Module(conn['name'], self.config['output'], loglevel=self.logger.getEffectiveLevel())
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 89, in __init__
self.run()
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 173, in run
self.output(events)
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 234, in output
self.output_client.send_batch(event_data_batch)
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 220, in send_batch
self._start_producer(partition_id, send_timeout)
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 126, in _start_producer
not self._producers[partition_id]
KeyError: 'all-partitions'
@ jthack,“ with self.output_client:”在代码块完成后关闭output_client。您使用了两次,因此第二次尝试使用封闭的客户端时,客户端处于错误状态。我建议您将代码与声明一起放入。
# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)
with self.output_client:
if "eventhub" in self.output_config.keys():
if self.output_config['eventhub'] is True:
if events:
i = 0
event_data_batch = self.output_client.create_batch()
for event in events:
try:
event_data_batch.add(EventData(json.dumps(event)))
except ValueError: # EventDataBatch object reaches max_size.
# Ship events
self.output_client.send_batch(event_data_batch)
# Set up the next batch
event_data_batch = self.output_client.create_batch()
except Exception as e:
self.output_error = True
self.logger.error("Error shipping event to EventHub: {}".format(e))
i += 1
if not self.output_error:
if events:
self.output_client.send_batch(event_data_batch)
self.logger.info("Sent %d events" % (len(events)))
else:
self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))