我们使用Step函数动态并行与Map状态来实现并发。 是否可以将值从映射状态步骤函数中的上游任务(lambda 或从文件读取)传递到“MaxConcurrency”字段。
当前代码:
"Type": "Map",
"InputPath": "$.detail",
"ItemsPath": "$.shipped",
"MaxConcurrency": 3,
"ResultPath": "$.detail.shipped",
期望(从 lambda 任务或读取文件任务将输入传递给 MaxConcurrency):
"Type": "Map",
"InputPath": "$.detail",
"ItemsPath": "$.shipped",
"MaxConcurrency": "$.input",
"ResultPath": "$.detail.shipped"
出现错误,因为它仅支持整数。
您可以在状态机定义时设置
maxConcurrency
,但不能在执行时设置。正如您所经历的,Map 的 maxConcurrency
需要 number
,但状态机语言使用 strings
来动态传递变量。
(注意:Step Functions 默认情况下是并发的。Docs:
maxConcurrency
的 “默认值为 0,这对并行性没有配额,迭代会尽可能同时调用”。)
在执行时动态限制并发的解决方法是“选择状态”,它根据输入变量分支到离散映射状态。每个分支的地图都有不同的 maxConcurrency
,但其他方面都是相同的。添加您需要的任何离散 maxConcurrency
选择。 Choice 还接受默认情况来捕获不匹配的选择输入。
// execution input
{
"concurrency": 5,
"jobs": [ { "jobId": 1 }, { "jobId": 2 }, { "jobId": 3 }, { "jobId": 4}]
}
// state machine definition (partial)
"States": {
"Max-Concurrency-Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.concurrency",
"NumericEquals": 5,
"Next": "MapState-MaxConcurrency-5" // maxConcurrency in this branch is set at 5
},
{
"Variable": "$.concurrency",
"NumericEquals": 10,
"Next": "MapState-MaxConcurrency-10" // maxConcurrency in this branch is set at 10
}
],
"Default": "MapState-MaxConcurrency-1" // maxConcurrency in the default branch is set at 1
},
将您的 Sfn 嵌套在新的 Sfn 中。新的父 Sfn 在输入中采用 maxConcurrency
在 Lambda 任务中,使用当前子 Sfn 的新字符串化 JSON 状态机定义调用 UpdateStateMachine
maxConurrency