AWS Step Function - 动态并行度 MaxConcurrency 字段

问题描述 投票:0回答:2

我们使用Step函数动态并行与Map状态来实现并发。 是否可以将值从映射状态步骤函数中的上游任务(lambda 或从文件读取)传递到“MaxConcurrency”字段。

当前代码:

"Type": "Map",
"InputPath": "$.detail",
"ItemsPath": "$.shipped",
"MaxConcurrency": 3,
"ResultPath": "$.detail.shipped",

期望(从 lambda 任务或读取文件任务将输入传递给 MaxConcurrency):

 "Type": "Map",
 "InputPath": "$.detail",
 "ItemsPath": "$.shipped",
 "MaxConcurrency": "$.input",
 "ResultPath": "$.detail.shipped"

出现错误,因为它仅支持整数。

amazon-web-services aws-lambda concurrency aws-step-functions
2个回答
1
投票

您可以在状态机定义时设置

maxConcurrency
,但不能在执行时设置。正如您所经历的,Map 的
maxConcurrency
需要
number
,但状态机语言使用
strings
来动态传递变量。

注意:Step Functions 默认情况下是并发的。Docs

maxConcurrency
“默认值为 0,这对并行性没有配额,迭代会尽可能同时调用”。)

选项1:选择+地图(难度:低)

在执行时动态限制并发的解决方法是“选择状态”,它根据输入变量分支到离散映射状态。每个分支的地图都有不同的 maxConcurrency,但其他方面都是相同的。添加您需要的任何离散 maxConcurrency

 选择。 Choice 还接受默认情况来捕获不匹配的选择输入。
// execution input
{
  "concurrency": 5,
  "jobs": [ { "jobId": 1 }, { "jobId": 2 }, { "jobId": 3 }, { "jobId": 4}]
}

// state machine definition (partial) "States": { "Max-Concurrency-Choice": { "Type": "Choice", "Choices": [ { "Variable": "$.concurrency", "NumericEquals": 5, "Next": "MapState-MaxConcurrency-5" // maxConcurrency in this branch is set at 5 }, { "Variable": "$.concurrency", "NumericEquals": 10, "Next": "MapState-MaxConcurrency-10" // maxConcurrency in this branch is set at 10 } ], "Default": "MapState-MaxConcurrency-1" // maxConcurrency in the default branch is set at 1 },

方案2:嵌套Sfn + API调用(难度:高)

将您的 Sfn 嵌套在新的 Sfn 中。新的父 Sfn 在输入中采用

maxConcurrency

。它有两个任务:

在 Lambda 任务中,使用当前子 Sfn 的新字符串化 JSON 状态机定义调用 

UpdateStateMachine
    API。
  1. 调用您当前的状态机。 Sfn 将拥有新的 maxConurrency
  2.     
不确定何时添加此参数,但 Map 步骤有一个可选的

0
投票
参数。我能够用这样的东西让它在我的堆栈中工作

JoinLoop:
    Type: Map
    End: True
    MaxConcurrencyPath: $.Concurrency

上一步中设置的并发。

https://docs.aws.amazon.com/step-functions/latest/dg/use-dist-map-orchestrate-large-scale-parallel-workloads.html

© www.soinside.com 2019 - 2024. All rights reserved.