我有一个与 SQS(简单队列服务)队列集成的 AWS Lambda 函数,其中包含用于存储失败消息的死信队列 (DLQ)。我们的目标是实现重试机制:一旦消息到达 DLQ,我们的目标是等待 6 小时,然后重新尝试将其发送回主队列进行进一步处理,最终将其转发到 Spring Boot 服务。
虽然 AWS Lambda 函数的最长运行时间为 15 分钟,但在 6 小时后编排操作却构成了挑战。为了解决这个问题,我们探索了一种自定义解决方案,而不是依赖内置的 SQS 重新驱动功能。
我们设计了一个流程,在消息进入 DLQ 时触发 AWS Lambda 函数。该函数作为延迟机制,将有效地将消息“保留”6 小时,然后将其重新插入主处理队列。随后,该消息将被发送到 Spring Boot 服务进行处理。
这种方法提供了对重试机制的灵活性和控制,使我们能够根据我们的特定用例对其进行定制。 这是我们的代码: `def move_messages_to_source_queue(dlq_url,source_url,visibility_timeout,delay_seconds): print("正在输入 move_messages_to_source_queue。")
while True:
try:
response = sqs_client.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=1
)
messages = response.get('Messages', [])
if not messages:
logger.error("No messages in DLQ")
break
message = messages[0]
receipt_handle = message['ReceiptHandle']
# Change the visibility timeout and delete the message in one operation
sqs_client.change_message_visibility(
QueueUrl=dlq_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=visibility_timeout
)
sqs_client.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=receipt_handle
)
# Send the message back to the source queue
sqs_client.send_message(
QueueUrl=source_url,
MessageBody=message['Body']
)
msg_to_send = json.dumps(message['Body'])
msg_info = f"🔁 Moved message from DLQ back to source queue: {msg_to_send}"
logger.info(msg_info)
except Exception as e:
logger.error(f"An error occurred: {e}")
# Handle the error as needed
# Introduce a delay before processing the next message}
print("Delay")
time.sleep(delay_seconds)`
为了将信息发送到服务,我们有另一种方法:
response= requests.post(url, json=post_data, headers=headers_input)
如您所见,该函数尝试将消息从 dlq 移至主队列,但缺少 6 小时内重试服务的情况。
忘记这一切。
使用
Visibility Timeout
将队列中的消息保留/隐藏 6 [最多 12] 小时。