如何通过本地Azure函数应用程序在多个分区上一一处理来自Azure Eventhub的事件

问题描述 投票:0回答:1

在本地调试Python azure函数应用程序时,该应用程序处理来自具有32个分区的Azure Eventhub的事件,我想知道是否可以一一处理这些事件。 看来我设置的任何设置来限制到达的事件数量总是等于分区数量,在本例中为 32。

我的

host.json

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  },
  "extensions": {
    "eventHubs": {
      "batchCheckpointFrequency": 1,
      "eventProcessorOptions": {
        "maxBatchSize": 1,
        "prefetchCount": 1,
        "maxConcurrentCalls": 1
      },
      "initialOffsetOptions": {
        "type": "fromEnd"
      }
    }
  }
}

功能应用程序代码:

@app.event_hub_message_trigger(arg_name="azeventhub",
                               event_hub_name="eventhubname",
                               connection="EventHubConnectionString",
                               cardinality=func.Cardinality.ONE,
                               consumer_group="consumer_group_x")
async def event_processor(azeventhub: func.EventHubEvent):
    event = azeventhub.get_body().decode('utf-8')
    logging.info("Received from EventHub: %s", event)
    try:
        json_event = json.loads(event)
        await process_event(json_event)

    except Exception as e:
        logging.error("Error while processing event: %s", e)
    time.sleep(10) # it does not wait after each event

在上面的代码中,睡眠仅在处理了 32 个事件后才执行,因此我假设它同时睡眠 32 次 10 秒。 当我使代码同步时,会发生相同的行为。

这在设计上是不可能的还是有确实可以强制这样做的设置?

python azure azure-functions azure-eventhub
1个回答
0
投票

从一批事件中逐一处理事件。如本

MS Doc
中所述,将
cardinality
设置为 one

我使用服务总线客户端将事件作为消息一一接收。

这段代码对我有用

function_app.py

import azure.functions as func
import logging
import json
import os
from azure.servicebus import ServiceBusClient, ServiceBusMessage

app = func.FunctionApp()

conn = os.getenv("ServiceBus_String")
queue = "queue_event"

def send_message_to_service_bus_queue(message):
    servicebus_client = ServiceBusClient.from_connection_string(conn)
    sender = servicebus_client.get_queue_sender(queue_name=queue)
    sender.send_messages(ServiceBusMessage(message))

@app.event_hub_message_trigger(arg_name="azeventhub", event_hub_name="myeventhub",
                               connection="EventHub_string",cardinality=func.Cardinality.ONE)
def main(azeventhub: func.EventHubEvent):

    events = json.loads(azeventhub.get_body().decode("utf-8"))
    for event in events:
        send_message_to_service_bus_queue(json.dumps(event))
        logging.info("processed message: %s",event)

local.settings.json
:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
    "EventHub_string": "Endpoint=sb://azevent.servicebus.windows.net/;SharedAccessKeyName=xxxxxxxxxxxxx;SharedAccessKey=xxxxxx",
    "ServiceBus_String":"Endpoint=sb://event-servicebus.servicebus.windows.net/;SharedAccessKeyName=xxxxxxxxxxx;SharedAccessKey=xxxxxxxxxxxxxx"
  }
}

INPUT

[
    {
        "Name": "Vivek",
        "Contact": "0000-000-000"
    },
    {
        "Name": "Vaibhav",
        "Contact": "1111-111-111"
    },
    {
        "Name": "Shandilya",
        "Contact": "2222-222-222"
    },
    {
        "Name": "Arjun",
        "Contact": "3333-333-333"
    },
    {
        "Name": "Karam",
        "Contact": "4444-444-444"
    }
]

OUTPUT

事件被一一处理并作为一条消息一一发送到服务总线:

Functions:

        main: eventHubTrigger

For detailed output, run func with --verbose flag.
[2024-02-14T12:00:12.883Z] Host lock lease acquired by instance ID '000000000000000000000000116B5F66'.
[2024-02-14T12:00:17.956Z] Executing 'Functions.main' (Reason='(null)', Id=be44d089-ef21-4028-93e5-1a8cf0d83b61)
[2024-02-14T12:00:17.964Z] Trigger Details: PartionId: 0, Offset: 23184, EnqueueTimeUtc: 2024-02-14T12:00:17.2660000+00:00, SequenceNumber: 39, Count: 1
[2024-02-14T12:00:20.760Z] processed message: {'Name': 'Vivek', 'Contact': '0000-000-000'}
[2024-02-14T12:00:23.828Z] processed message: {'Name': 'Vaibhav', 'Contact': '1111-111-111'}
[2024-02-14T12:00:26.299Z] processed message: {'Name': 'Shandilya', 'Contact': '2222-222-222'}
[2024-02-14T12:00:28.832Z] processed message: {'Name': 'Arjun', 'Contact': '3333-333-333'}
[2024-02-14T12:00:31.448Z] processed message: {'Name': 'Karam', 'Contact': '4444-444-444'}
[2024-02-14T12:00:31.516Z] Executed 'Functions.main' (Succeeded, Id=be44d089-ef21-4028-93e5-1a8cf0d83b61, Duration=13623ms)

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.