使用Python编程模型V2的Azure Durable Function不调用activity_trigger

问题描述 投票:0回答:1

我已成功设置 Azure Durable 函数,我可以执行它并看到 http 触发器激活,日志文件证明了这一点。我还可以看到 Orchestrator_trigger 被调用。

我已设置配置设置以启用 v2 编程模型:

AzureWebJobsFeatureFlags=EnableWorkerIndexing

按照说明这里

这里分别是 Orchestrator 和 Activity_trigger 的其余代码:

# Orchestrator
@bp.orchestration_trigger(context_name="context")
def my_orchestrator(context:df.DurableOrchestrationContext):    
    
    data = context.get_input()    
    logging.info(f"Orchestrator received input data: {data}")    
    activity_data = {"mydata": data}
    serialised_activity_data = json.dumps(json_activity_data)
    
    # I DONT SEE ANY LOGGING BEYOND HERE

    result = yield context.call_activity("calculator", serialised_activity_data)
    logging.info(f"Activity result: {result}")

然后我在这里定义了activity_trigger。

@bp.activity_trigger(input_name="activity_data")
def calculator(activity_data):
    
    json_activity_data = json.loads(activity_data)
    data = json_activity_data["mydata"]
    
    logging.info("Executing Trigger")

    // DO ACTIVITY


日志在上面突出显示的点之后停止。
我不知道应该在 Azure Function App 中的哪个位置查找。我可以在协调器日志中看到移动的证据(参见附图)。

我还在我的 function_app.py 中正确设置了蓝图,否则它将首先被调用。

最后我尝试调整日志记录以实现现在的改进:

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true
      }
    },
    "logLevel": {
      "default": "Information",
      "Host.Results": "Information",
      "Function": "Information",
      "Host.Aggregator": "Information"
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.0.0, 5.0.0)"
  }
}

我尝试改进日志记录。 我尝试逐步完成各个阶段并参考大量示例,但实际上没有任何提示说明为什么 Activity_trigger 没有被触发。

python azure azure-durable-functions
1个回答
0
投票

使用Python编程模型V2的Azure Durable Function不调用activity_trigger

要调用持久功能活动触发器,请使用以下代码

function_app.py:

import azure.functions as func
import azure.durable_functions as df
import json
import logging

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    instance_id = await client.start_new(function_name)
    response = client.create_check_status_response(req, instance_id)
    return response

@myApp.orchestration_trigger(context_name="context")
def rith_orchestrator(context):
    
    rith_act_data = {"testdata": "Rithwik"}
    rith_serialised_act_data = json.dumps(rith_act_data)
    logging.info(f"Activity result: {rith_serialised_act_data}")
    rith_result = yield context.call_activity("hello_rithwik", rith_serialised_act_data)
    return rith_result

@myApp.activity_trigger(input_name="rithjson")
def hello_rithwik(rithjson: str):
    json_activity_data = json.loads(rithjson)
    data = json_activity_data["testdata"] 
    logging.info("Helo Rithwik, Executing Activity Trigger")
    return data

需求.txt

azure-functions
azure-functions-durable

主机.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

local.settings.json:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=rithwik;AccountKey=pPBW9jWu77Sqp5F+NLWkA==;EndpointSuffix=core.windows.net",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
  }
}

调用函数:

enter image description here

在 URL 中将 functionName 替换为 Orchestrator 名称,如下所示:

http://localhost:7071/api/orchestrators/rith_orchestrator

输出:

enter image description here

现在使用第一个URL

statusQueryGetUri
,你将得到序列化的数据:

enter image description here

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.