我可以找到许多在 Step Functions 中使用 Lambda 在 Redshift 上调用 SQL 脚本的示例(例如:this 效果很好),但是我不知道如何在 Step Functions 中使用 AWS Fargate 执行 SQL 脚本。我无法弄清楚如何传递 Redshift 凭据来执行查询。在 lambda 情况下,很简单,如下所示。如何将凭据传递给 Fargate Task 可能是相同的,但是执行 SQL 脚本的命令是什么?
"States": {
"loadJob": {
"Type": "Pass",
"Next": "loadJobETL",
"Result": {
"input": {
"redshift_cluster_id": "<RS_CLUSTER>",
"redshift_database": "<MY_DB>",
"redshift_user": "<MY_DB>root",
"redshift_schema": "MY_SCHEMA",
"action": "load_customer_address",
"sql_statement": [
"begin transaction;",
"MY_SQL_STATEMENT",
"end transaction;"
]
}
}
},
"loadJobETL": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:XXXXXXXXXXX:function:SOME_FUNCTION",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$",
"Next": "checkStatus"
}
}
您可以使用 Step Functions 与 ECS RunTask 的优化集成。可能如下所示。至于代码在 Fargate 容器中的外观,这取决于您选择的语言,但我希望使用 AWS SDK,并且看起来与您在 Lambda 代码中包含的内容非常相似。
{
"StartAt": "loadJob",
"States": {
"loadJob": {
"Type": "Pass",
"Next": "loadJobETL",
"Result": {
"input": {
"redshift_cluster_id": "<RS_CLUSTER>",
"redshift_database": "<MY_DB>",
"redshift_user": "<MY_DB>root",
"redshift_schema": "MY_SCHEMA",
"action": "load_customer_address",
"sql_statement": [
"begin transaction;",
"MY_SQL_STATEMENT",
"end transaction;"
]
}
}
},
"loadJobETL": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
"Parameters": {
"LaunchType": "FARGATE",
"Cluster": "arn:aws:ecs:REGION:ACCOUNT_ID:cluster/MyECSCluster",
"TaskDefinition": "arn:aws:ecs:REGION:ACCOUNT_ID:task-definition/MyTaskDefinition:1",
"Overrides": {
"ContainerOverrides": [
{
"Name": "container-name",
"Command.$": "$"
}
]
}
},
"End": true
}
}
}
或者,根据输出的大小,您可以使用
arn:aws:states:::aws-sdk:redshiftdata:executeStatement
直接从 Step Functions 运行它。