AWS SQS 不会触发期望的 Lambda stepfunctions ,不处理队列中的所有消息

问题描述 投票:0回答:1

我已经在 s3 存储桶上配置了 sqs,sqs 应该为 s3 存储桶中上传的 21 个文件调用 21 个步骤函数,而不是只触发 14 个步骤函数,而我缺少剩余的 7 个事件。

当我在 s3 存储桶中上传 21 个文件时,这些消息将被推送到 sqs,这将触发 lambda。我已将 lambda 并发性设置为 5。因此应启动前 5 个步骤函数。由于我们上传了 21 个文件,总共 21 个步骤函数应该以 5 个为一组执行。现在,14 个步骤函数正在依次运行,而不是 21 个

lambda 代码

import json
import boto3
import os
import uuid
import time



client = boto3.client('stepfunctions')
ipparam ={}
delay_seconds = 5
max_retry = 3

def lambda_handler(event, context):
    
    print('event',event)
    record = event['Records'][0]
    body = record['body']
    s3EventBody= json.loads(body)
    s3EventBody= s3EventBody['Records']
    
    bucket = s3EventBody[0]['s3']['bucket']['name']
    key = s3EventBody[0]['s3']['object']['key']
    
    
    
    #bucket_name = event['detail']['requestParameters']['bucketName']
    #object_key = event['detail']['requestParameters']['key']
    
    ipparam["bucket"] = bucket
    ipparam["key"] = key
    
    print('bucket',bucket)
    print('key',key)
    
    transactionid = str(uuid.uuid4())
    

    for retry_attempt in range(1,max_retry+1):
        
        try:
            response = client.start_execution(
                stateMachineArn=os.environ['stepFunctionArn'],
                name = transactionid,
                input=json.dumps(ipparam)
                )
            print('responssee',response)
            executionArn =  response['executionArn']
            break
    
        except Exception as e:
            print("Error Invoking step function : ",e)
            if retry_attempt < max_retry:
                print('retrying in seconds : ',delay_seconds )
                time.sleep(delay_seconds)
            else:
                print('Max retries exceeded. Please check logs for further details')
                raise
    
    for i in range(1,15):
        response = client.describe_execution(
            executionArn=executionArn
            )
        execution_status = response['status']
        print('execution_status',execution_status)
        
        if execution_status in ('SUCCEEDED','FAILED'):
            return {
                'statusCode': 200,
                'body': execution_status
                }
        time.sleep(60)

enter image description here enter image description here

aws-lambda bigdata amazon-sqs aws-step-functions aws-sqs-fifo
1个回答
0
投票

据我所知,您的代码仅处理记录数组中的第一条记录。因此,您可能会在每次 Lambda 函数调用中收到多条 SQS 消息。每个 SQS 消息中可能有多个 S3 事件。

https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-struct.html

© www.soinside.com 2019 - 2024. All rights reserved.