Fifo-SQS lambda 触发故障处理

问题描述 投票:0回答:1

我们的系统使用 Fifo SQS 队列来驱动 lambda。以下来自我们的 SAM 模板:

  EventParserTriggeringQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600  # 14 Days (max)
      FifoQueue: true
      ContentBasedDeduplication: true
      VisibilityTimeout: 240  # Must be > EventParser Timeout
      Tags:
        - Key: "datadog"
          Value: "true"
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt EventParserDeadLetters.Arn
        maxReceiveCount: 1

  EventParser:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: lambdas/event_parser_lambda/
      Handler: event_parser.lambda_handler
      Timeout: 120
      Events:
        EventParserTriggeringQueueEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt EventParserTriggeringQueue.Arn
            BatchSize: 1
            ScalingConfig:
              MaximumConcurrency: 2
      Policies:
        Statement:
          - Action:
              - ssm:GetParametersByPath
              - ssm:GetParameters
              - ssm:GetParameter
            Effect: Allow
            Resource:
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/datadog/api_key"
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/sentry/dsn"
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${AWS::StackName}/*"
          - Action:
              - sqs:DeleteMessage
              - sqs:GetQueueAttributes
              - sqs:ReceiveMessage
            Effect: Allow
            Resource: !GetAtt EventParserTriggeringQueue.Arn

  EventParserDeadLetters:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600  # 14 Days (max)
      FifoQueue: true
      ContentBasedDeduplication: true
      Tags:
        - Key: "datadog"
          Value: "true"
        - Key: "deadletter"
          Value: "true"

我正在寻找的是重试行为,如下所示:
  • 如果 lambda 失败,它会立即重试
  • 如果 lambda 失败的次数超过允许的最大失败次数,则其消息会立即进入死信队列,并且可以立即尝试下一条消息。
相反,我们看到的行为是:
  • 如果 lambda 失败,则仅在可见性超时期限后重试。这个周期必然比 lambda 的典型运行时间长,因此这里会产生很大的延迟。
  • 如果 lambda 失败的次数超过允许的最大失败次数,则消息只会在可见性超时期限后进入死信队列。
首先,让我检查一下我对系统如何工作的理解,因为它并没有真正记录在任何地方:
  • 对于 SQS 驱动的 lambda,lambda 运行时会定期调用 SQS 队列上的
    ReceiveMessage
    。从我们的系统来看,默认值是每 10 秒一次。
  • 如果有可用消息,队列将返回该消息。
  • 当队列返回消息时,它会在可见性超时时启动时钟。
    • 在可见性超时结束之前,
      ReceiveMessage
      对队列的调用(针对同一消息组 ID)将返回空。 (这是 Fifo SQS 功能。对于非 FIFO 队列,仅隐藏接收到的消息。)
    • 当可见性超时已过时,如果至少已收到队列的
      maxReceiveCount
      的头消息,则队列放弃该消息,可以选择将其放置在死信队列中。
  • lambda 运行时将消息传递给 lambda 函数。
  • 如果函数成功,运行时将调用队列上的
    DeleteMessage
    。这将删除头消息,并使下一条消息可用(即清除可见性超时)。
  • 如果消息失败,运行时将继续运行,就好像什么也没发生一样:
    • 它定期轮询队列,这意味着它会得到空响应
      ReceiveMessage
      直到可见性超时结束
    • 一旦超过可见性超时,队列将再次返回相同的消息。或者,如果消息已收到至少其“最大接收计数”,则队列将返回下一条消息。
我考虑过的一个解决方案:

基本上,让 lambda 负责:

  • 将重试逻辑放入 lambda 中的循环中
  • 如果 lambda 没有成功完成其循环,请让它显式地将消息排入我们将用于死信的 SQS 队列。该队列不会被配置为 DLQ,只有我们会这样使用它。
  • lambda 总是成功返回,因此 lambda 运行时总是从 Fifo 输入队列中删除消息。
这是我能做的最好的事情吗?

这种方法的一个严重问题是,lambda 函数运行时间不能超过 15 分钟,我确实担心重试 5 次可能会让我们面临风险。

amazon-web-services aws-lambda amazon-sqs retry-logic aws-sqs-fifo
1个回答
0
投票

这个答案是特定于 python 的,但希望能够很容易地转换为其他实现。

从广义上讲,是的,当出现故障时,lambda 必须负责队列处理。

我编写了以下装饰器,将其附加到 SQS 触发的 lambda 的所有入口点函数:

def sqs_triggered(func) -> Callable:
    """
    Decorator function for lambdas invoked by SQS messages.

    - Because SQS events can be batched, this decorator will invoke the function once per record.
    - If the function fails, this decorator will adjust the visibility timeout of the failed records to 0. If
      there were any successful records, it will delete them from the queue before propagating the error.
    """

    def decorator(event, *args, **kwargs):
        for i, record in enumerate(event["Records"]):
            try:
                func(json.loads(record["body"]), *args, **kwargs)
            except Exception as e:
                if "eventSourceARN" in record:
                    queue_name = record["eventSourceARN"].split(":")[-1]
                    queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
                    for successful_record in event["Records"][:i]:
                        sqs_client.delete_message(
                            QueueUrl=queue_url,
                            ReceiptHandle=successful_record["receiptHandle"],
                        )
                    for failed_record in event["Records"][i:]:
                        sqs_client.change_message_visibility(
                            QueueUrl=queue_url,
                            ReceiptHandle=failed_record["receiptHandle"],
                            VisibilityTimeout=0,
                        )
                raise e

    return decorator

© www.soinside.com 2019 - 2024. All rights reserved.