我们的系统使用 Fifo SQS 队列来驱动 lambda。以下来自我们的 SAM 模板:
EventParserTriggeringQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600 # 14 Days (max)
FifoQueue: true
ContentBasedDeduplication: true
VisibilityTimeout: 240 # Must be > EventParser Timeout
Tags:
- Key: "datadog"
Value: "true"
RedrivePolicy:
deadLetterTargetArn: !GetAtt EventParserDeadLetters.Arn
maxReceiveCount: 1
EventParser:
Type: AWS::Serverless::Function
Properties:
CodeUri: lambdas/event_parser_lambda/
Handler: event_parser.lambda_handler
Timeout: 120
Events:
EventParserTriggeringQueueEvent:
Type: SQS
Properties:
Queue: !GetAtt EventParserTriggeringQueue.Arn
BatchSize: 1
ScalingConfig:
MaximumConcurrency: 2
Policies:
Statement:
- Action:
- ssm:GetParametersByPath
- ssm:GetParameters
- ssm:GetParameter
Effect: Allow
Resource:
- Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/datadog/api_key"
- Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/sentry/dsn"
- Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${AWS::StackName}/*"
- Action:
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ReceiveMessage
Effect: Allow
Resource: !GetAtt EventParserTriggeringQueue.Arn
EventParserDeadLetters:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600 # 14 Days (max)
FifoQueue: true
ContentBasedDeduplication: true
Tags:
- Key: "datadog"
Value: "true"
- Key: "deadletter"
Value: "true"
ReceiveMessage
。从我们的系统来看,默认值是每 10 秒一次。ReceiveMessage
对队列的调用(针对同一消息组 ID)将返回空。 (这是 Fifo SQS 功能。对于非 FIFO 队列,仅隐藏接收到的消息。)maxReceiveCount
的头消息,则队列放弃该消息,可以选择将其放置在死信队列中。DeleteMessage
。这将删除头消息,并使下一条消息可用(即清除可见性超时)。ReceiveMessage
直到可见性超时结束基本上,让 lambda 负责:
这种方法的一个严重问题是,lambda 函数运行时间不能超过 15 分钟,我确实担心重试 5 次可能会让我们面临风险。
这个答案是特定于 python 的,但希望能够很容易地转换为其他实现。
从广义上讲,是的,当出现故障时,lambda 必须负责队列处理。
我编写了以下装饰器,将其附加到 SQS 触发的 lambda 的所有入口点函数:
def sqs_triggered(func) -> Callable:
"""
Decorator function for lambdas invoked by SQS messages.
- Because SQS events can be batched, this decorator will invoke the function once per record.
- If the function fails, this decorator will adjust the visibility timeout of the failed records to 0. If
there were any successful records, it will delete them from the queue before propagating the error.
"""
def decorator(event, *args, **kwargs):
for i, record in enumerate(event["Records"]):
try:
func(json.loads(record["body"]), *args, **kwargs)
except Exception as e:
if "eventSourceARN" in record:
queue_name = record["eventSourceARN"].split(":")[-1]
queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
for successful_record in event["Records"][:i]:
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=successful_record["receiptHandle"],
)
for failed_record in event["Records"][i:]:
sqs_client.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=failed_record["receiptHandle"],
VisibilityTimeout=0,
)
raise e
return decorator