我已经在 s3 存储桶上配置了 sqs,sqs 应该为 s3 存储桶中上传的 21 个文件调用 21 个步骤函数,而不是只触发 14 个步骤函数,而我缺少剩余的 7 个事件。
当我在 s3 存储桶中上传 21 个文件时,这些消息将被推送到 sqs,这将触发 lambda。我已将 lambda 并发性设置为 5。因此应启动前 5 个步骤函数。由于我们上传了 21 个文件,总共 21 个步骤函数应该以 5 个为一组执行。现在,14 个步骤函数正在依次运行,而不是 21 个
lambda 代码
import json
import boto3
import os
import uuid
import time
client = boto3.client('stepfunctions')
ipparam ={}
delay_seconds = 5
max_retry = 3
def lambda_handler(event, context):
print('event',event)
record = event['Records'][0]
body = record['body']
s3EventBody= json.loads(body)
s3EventBody= s3EventBody['Records']
bucket = s3EventBody[0]['s3']['bucket']['name']
key = s3EventBody[0]['s3']['object']['key']
#bucket_name = event['detail']['requestParameters']['bucketName']
#object_key = event['detail']['requestParameters']['key']
ipparam["bucket"] = bucket
ipparam["key"] = key
print('bucket',bucket)
print('key',key)
transactionid = str(uuid.uuid4())
for retry_attempt in range(1,max_retry+1):
try:
response = client.start_execution(
stateMachineArn=os.environ['stepFunctionArn'],
name = transactionid,
input=json.dumps(ipparam)
)
print('responssee',response)
executionArn = response['executionArn']
break
except Exception as e:
print("Error Invoking step function : ",e)
if retry_attempt < max_retry:
print('retrying in seconds : ',delay_seconds )
time.sleep(delay_seconds)
else:
print('Max retries exceeded. Please check logs for further details')
raise
for i in range(1,15):
response = client.describe_execution(
executionArn=executionArn
)
execution_status = response['status']
print('execution_status',execution_status)
if execution_status in ('SUCCEEDED','FAILED'):
return {
'statusCode': 200,
'body': execution_status
}
time.sleep(60)
据我所知,您的代码仅处理记录数组中的第一条记录。因此,您可能会在每次 Lambda 函数调用中收到多条 SQS 消息。每个 SQS 消息中可能有多个 S3 事件。
https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-struct.html