使用托管身份python v2进行Azure事件网格输出绑定,我不断收到“EventGridAttribute.TopicEndpointUri”

问题描述 投票:0回答:1

我正在使用 python v2 azure 事件网格输出绑定,使用来自微软官方文档的代码。我正在尝试使用托管身份而不是主题端点和密钥。但在这里

@app.event_grid_output(
    arg_name="outputEvent",
    topic_endpoint_uri="MyEventGridTopicUriSetting",
    topic_key_setting="MyEventGridTopicKeySetting")

这里应该做什么? 我不断收到以下错误

无法解析属性值 'EventGridAttribute.TopicEndpointUri'。

import logging
import azure.functions as func
import datetime

@app.function_name(name="eventgrid_output")
@app.route(route="eventgrid_output")
@app.event_grid_output(
    arg_name="outputEvent",
    topic_endpoint_uri="MyEventGridTopicUriSetting",
    topic_key_setting="MyEventGridTopicKeySetting")
def eventgrid_output(eventGridEvent: func.EventGridEvent, 
         outputEvent: func.Out[func.EventGridOutputEvent]) -> None:

    logging.log("eventGridEvent: ", eventGridEvent)

    outputEvent.set(
        func.EventGridOutputEvent(
            id="test-id",
            data={"tag1": "value1", "tag2": "value2"},
            subject="test-subject",
            event_type="test-event-1",
            event_time=datetime.datetime.utcnow(),
            data_version="1.0"))
python azure azure-functions azure-eventgrid
1个回答
0
投票

要使用托管身份实现事件网格触发器 Azure 功能,您必须使用 connection 属性。

使用以下功能代码:

app = func.FunctionApp()

@app.function_name(name="eventgrid_out")
@app.event_grid_trigger(arg_name="eventGridEvent")
@app.event_grid_output(
    arg_name="outputEvent",
connection="mytesteventgrid__topicEndpointUri")
def eventgrid_output(eventGridEvent: func.EventGridEvent, 
         outputEvent: func.Out[func.EventGridOutputEvent]) -> None:

    logging.info("eventGridEvent: %s", eventGridEvent)

    outputEvent.set(
        func.EventGridOutputEvent(
            id="test-id",
            data={"tag1": "value1", "tag2": "value2"},
            subject="test-subject",
            event_type="test-event-1",
            event_time=datetime.datetime.utcnow(),
            data_version="1.0"))

local.settings.json:

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "mytesteventgrid__topicEndpointUri":"https://<topic_name>.southindia-1.eventgrid.azure.net/api/events"
  }
}
  • 使用托管身份时,分配角色
    EventGrid Contributor,EventGrid Data Sender
    以在运行时访问事件网格主题。

发送事件:

enter image description here

enter image description here

参考资料:

https://learn.microsoft.com/en-gb/answers/questions/1615439/does-eventgrid-output-binging-v2-support-management-i

© www.soinside.com 2019 - 2024. All rights reserved.