我已成功设置 Azure Durable 函数,我可以执行它并看到 http 触发器激活,日志文件证明了这一点。我还可以看到 Orchestrator_trigger 被调用。
我已设置配置设置以启用 v2 编程模型:
AzureWebJobsFeatureFlags=EnableWorkerIndexing
按照说明这里
这里分别是 Orchestrator 和 Activity_trigger 的其余代码:
# Orchestrator
@bp.orchestration_trigger(context_name="context")
def my_orchestrator(context:df.DurableOrchestrationContext):
data = context.get_input()
logging.info(f"Orchestrator received input data: {data}")
activity_data = {"mydata": data}
serialised_activity_data = json.dumps(json_activity_data)
# I DONT SEE ANY LOGGING BEYOND HERE
result = yield context.call_activity("calculator", serialised_activity_data)
logging.info(f"Activity result: {result}")
然后我在这里定义了activity_trigger。
@bp.activity_trigger(input_name="activity_data")
def calculator(activity_data):
json_activity_data = json.loads(activity_data)
data = json_activity_data["mydata"]
logging.info("Executing Trigger")
// DO ACTIVITY
日志在上面突出显示的点之后停止。
我不知道应该在 Azure Function App 中的哪个位置查找。我可以在协调器日志中看到移动的证据(参见附图)。
我还在我的 function_app.py 中正确设置了蓝图,否则它将首先被调用。
最后我尝试调整日志记录以实现现在的改进:
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true
}
},
"logLevel": {
"default": "Information",
"Host.Results": "Information",
"Function": "Information",
"Host.Aggregator": "Information"
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.0.0, 5.0.0)"
}
}
我尝试改进日志记录。 我尝试逐步完成各个阶段并参考大量示例,但实际上没有任何提示说明为什么 Activity_trigger 没有被触发。
使用Python编程模型V2的Azure Durable Function不调用activity_trigger
要调用持久功能活动触发器,请使用以下代码:
function_app.py:
import azure.functions as func
import azure.durable_functions as df
import json
import logging
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name)
response = client.create_check_status_response(req, instance_id)
return response
@myApp.orchestration_trigger(context_name="context")
def rith_orchestrator(context):
rith_act_data = {"testdata": "Rithwik"}
rith_serialised_act_data = json.dumps(rith_act_data)
logging.info(f"Activity result: {rith_serialised_act_data}")
rith_result = yield context.call_activity("hello_rithwik", rith_serialised_act_data)
return rith_result
@myApp.activity_trigger(input_name="rithjson")
def hello_rithwik(rithjson: str):
json_activity_data = json.loads(rithjson)
data = json_activity_data["testdata"]
logging.info("Helo Rithwik, Executing Activity Trigger")
return data
需求.txt
azure-functions
azure-functions-durable
主机.json
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=rithwik;AccountKey=pPBW9jWu77Sqp5F+NLWkA==;EndpointSuffix=core.windows.net",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
}
}
调用函数:
在 URL 中将 functionName 替换为 Orchestrator 名称,如下所示:
http://localhost:7071/api/orchestrators/rith_orchestrator
输出:
现在使用第一个URL
statusQueryGetUri
,你将得到序列化的数据: