在本地调试Python azure函数应用程序时,该应用程序处理来自具有32个分区的Azure Eventhub的事件,我想知道是否可以一一处理这些事件。 看来我设置的任何设置来限制到达的事件数量总是等于分区数量,在本例中为 32。
我的
host.json
:
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"eventHubs": {
"batchCheckpointFrequency": 1,
"eventProcessorOptions": {
"maxBatchSize": 1,
"prefetchCount": 1,
"maxConcurrentCalls": 1
},
"initialOffsetOptions": {
"type": "fromEnd"
}
}
}
}
功能应用程序代码:
@app.event_hub_message_trigger(arg_name="azeventhub",
event_hub_name="eventhubname",
connection="EventHubConnectionString",
cardinality=func.Cardinality.ONE,
consumer_group="consumer_group_x")
async def event_processor(azeventhub: func.EventHubEvent):
event = azeventhub.get_body().decode('utf-8')
logging.info("Received from EventHub: %s", event)
try:
json_event = json.loads(event)
await process_event(json_event)
except Exception as e:
logging.error("Error while processing event: %s", e)
time.sleep(10) # it does not wait after each event
在上面的代码中,睡眠仅在处理了 32 个事件后才执行,因此我假设它同时睡眠 32 次 10 秒。 当我使代码同步时,会发生相同的行为。
这在设计上是不可能的还是有确实可以强制这样做的设置?
从一批事件中逐一处理事件。如本
MS Doc中所述,将
cardinality
设置为 one
。
我使用服务总线客户端将事件作为消息一一接收。
这段代码对我有用
function_app.py
:
import azure.functions as func
import logging
import json
import os
from azure.servicebus import ServiceBusClient, ServiceBusMessage
app = func.FunctionApp()
conn = os.getenv("ServiceBus_String")
queue = "queue_event"
def send_message_to_service_bus_queue(message):
servicebus_client = ServiceBusClient.from_connection_string(conn)
sender = servicebus_client.get_queue_sender(queue_name=queue)
sender.send_messages(ServiceBusMessage(message))
@app.event_hub_message_trigger(arg_name="azeventhub", event_hub_name="myeventhub",
connection="EventHub_string",cardinality=func.Cardinality.ONE)
def main(azeventhub: func.EventHubEvent):
events = json.loads(azeventhub.get_body().decode("utf-8"))
for event in events:
send_message_to_service_bus_queue(json.dumps(event))
logging.info("processed message: %s",event)
local.settings.json
:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
"EventHub_string": "Endpoint=sb://azevent.servicebus.windows.net/;SharedAccessKeyName=xxxxxxxxxxxxx;SharedAccessKey=xxxxxx",
"ServiceBus_String":"Endpoint=sb://event-servicebus.servicebus.windows.net/;SharedAccessKeyName=xxxxxxxxxxx;SharedAccessKey=xxxxxxxxxxxxxx"
}
}
INPUT
:[
{
"Name": "Vivek",
"Contact": "0000-000-000"
},
{
"Name": "Vaibhav",
"Contact": "1111-111-111"
},
{
"Name": "Shandilya",
"Contact": "2222-222-222"
},
{
"Name": "Arjun",
"Contact": "3333-333-333"
},
{
"Name": "Karam",
"Contact": "4444-444-444"
}
]
OUTPUT
:事件被一一处理并作为一条消息一一发送到服务总线:
Functions:
main: eventHubTrigger
For detailed output, run func with --verbose flag.
[2024-02-14T12:00:12.883Z] Host lock lease acquired by instance ID '000000000000000000000000116B5F66'.
[2024-02-14T12:00:17.956Z] Executing 'Functions.main' (Reason='(null)', Id=be44d089-ef21-4028-93e5-1a8cf0d83b61)
[2024-02-14T12:00:17.964Z] Trigger Details: PartionId: 0, Offset: 23184, EnqueueTimeUtc: 2024-02-14T12:00:17.2660000+00:00, SequenceNumber: 39, Count: 1
[2024-02-14T12:00:20.760Z] processed message: {'Name': 'Vivek', 'Contact': '0000-000-000'}
[2024-02-14T12:00:23.828Z] processed message: {'Name': 'Vaibhav', 'Contact': '1111-111-111'}
[2024-02-14T12:00:26.299Z] processed message: {'Name': 'Shandilya', 'Contact': '2222-222-222'}
[2024-02-14T12:00:28.832Z] processed message: {'Name': 'Arjun', 'Contact': '3333-333-333'}
[2024-02-14T12:00:31.448Z] processed message: {'Name': 'Karam', 'Contact': '4444-444-444'}
[2024-02-14T12:00:31.516Z] Executed 'Functions.main' (Succeeded, Id=be44d089-ef21-4028-93e5-1a8cf0d83b61, Duration=13623ms)