我正在使用 python v2 azure 事件网格输出绑定,使用来自微软官方文档的代码。我正在尝试使用托管身份而不是主题端点和密钥。但在这里
@app.event_grid_output(
arg_name="outputEvent",
topic_endpoint_uri="MyEventGridTopicUriSetting",
topic_key_setting="MyEventGridTopicKeySetting")
这里应该做什么? 我不断收到以下错误
无法解析属性值 'EventGridAttribute.TopicEndpointUri'。
import logging
import azure.functions as func
import datetime
@app.function_name(name="eventgrid_output")
@app.route(route="eventgrid_output")
@app.event_grid_output(
arg_name="outputEvent",
topic_endpoint_uri="MyEventGridTopicUriSetting",
topic_key_setting="MyEventGridTopicKeySetting")
def eventgrid_output(eventGridEvent: func.EventGridEvent,
outputEvent: func.Out[func.EventGridOutputEvent]) -> None:
logging.log("eventGridEvent: ", eventGridEvent)
outputEvent.set(
func.EventGridOutputEvent(
id="test-id",
data={"tag1": "value1", "tag2": "value2"},
subject="test-subject",
event_type="test-event-1",
event_time=datetime.datetime.utcnow(),
data_version="1.0"))
要使用托管身份实现事件网格触发器 Azure 功能,您必须使用 connection 属性。
使用以下功能代码:
app = func.FunctionApp()
@app.function_name(name="eventgrid_out")
@app.event_grid_trigger(arg_name="eventGridEvent")
@app.event_grid_output(
arg_name="outputEvent",
connection="mytesteventgrid__topicEndpointUri")
def eventgrid_output(eventGridEvent: func.EventGridEvent,
outputEvent: func.Out[func.EventGridOutputEvent]) -> None:
logging.info("eventGridEvent: %s", eventGridEvent)
outputEvent.set(
func.EventGridOutputEvent(
id="test-id",
data={"tag1": "value1", "tag2": "value2"},
subject="test-subject",
event_type="test-event-1",
event_time=datetime.datetime.utcnow(),
data_version="1.0"))
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"mytesteventgrid__topicEndpointUri":"https://<topic_name>.southindia-1.eventgrid.azure.net/api/events"
}
}
EventGrid Contributor,EventGrid Data Sender
以在运行时访问事件网格主题。发送事件:
参考资料: