如何在下班后发送sqs队列消息?

问题描述 投票:0回答:1

我有一个与 SQS(简单队列服务)队列集成的 AWS Lambda 函数,其中包含用于存储失败消息的死信队列 (DLQ)。我们的目标是实现重试机制:一旦消息到达 DLQ,我们的目标是等待 6 小时,然后重新尝试将其发送回主队列进行进一步处理,最终将其转发到 Spring Boot 服务。

虽然 AWS Lambda 函数的最长运行时间为 15 分钟,但在 6 小时后编排操作却构成了挑战。为了解决这个问题,我们探索了一种自定义解决方案,而不是依赖内置的 SQS 重新驱动功能。

我们设计了一个流程,在消息进入 DLQ 时触发 AWS Lambda 函数。该函数作为延迟机制,将有效地将消息“保留”6 小时,然后将其重新插入主处理队列。随后,该消息将被发送到 Spring Boot 服务进行处理。

这种方法提供了对重试机制的灵活性和控制,使我们能够根据我们的特定用例对其进行定制。 这是我们的代码: `def move_messages_to_source_queue(dlq_url,source_url,visibility_timeout,delay_seconds): print("正在输入 move_messages_to_source_queue。")

while True:
    try:
        response = sqs_client.receive_message(
            QueueUrl=dlq_url,
            MaxNumberOfMessages=1
        )

        messages = response.get('Messages', [])
        if not messages:
            logger.error("No messages in DLQ")
            break

        message = messages[0]
        receipt_handle = message['ReceiptHandle']

        # Change the visibility timeout and delete the message in one operation
        sqs_client.change_message_visibility(
            QueueUrl=dlq_url,
            ReceiptHandle=receipt_handle,
            VisibilityTimeout=visibility_timeout
        )

        sqs_client.delete_message(
            QueueUrl=dlq_url,
            ReceiptHandle=receipt_handle
        )

        # Send the message back to the source queue
        sqs_client.send_message(
            QueueUrl=source_url,
            MessageBody=message['Body']
        )
        
        msg_to_send = json.dumps(message['Body'])
        msg_info = f"🔁 Moved message from DLQ back to source queue: {msg_to_send}"
        logger.info(msg_info)
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        # Handle the error as needed

    # Introduce a delay before processing the next message}
    print("Delay")
    time.sleep(delay_seconds)`

为了将信息发送到服务,我们有另一种方法:

response= requests.post(url, json=post_data, headers=headers_input)

如您所见,该函数尝试将消息从 dlq 移至主队列,但缺少 6 小时内重试服务的情况。

amazon-web-services amazon-sqs
1个回答
0
投票

忘记这一切。

使用

Visibility Timeout
将队列中的消息保留/隐藏 6 [最多 12] 小时。

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html

© www.soinside.com 2019 - 2024. All rights reserved.