Python 中的 AWS SQS FIFO - 即使在正确的 VisbilityTimeout 之后,删除的消息仍然可用

问题描述 投票:0回答:1

我创建了一个 FIFO 队列。 我发送了 10 条消息,收到它们并删除它们。但它们仍然在队列中并再次收到。我最初收到 10 条消息,然后是 5 条,然后是 2 条,然后是 0 条。“delete_message”响应也是 200。

代码:

import json
import os
import uuid

import boto3
from dotenv import load_dotenv

load_dotenv()
sqs = boto3.client("sqs", os.getenv("AWS_REGION"))
receive_queue = os.getenv("RECEIVE_QUEUE_URL")


def receive(attempt_id, max_num_messages):
    response = sqs.receive_message(
        QueueUrl=receive_queue,
        ReceiveRequestAttemptId=attempt_id,
        MaxNumberOfMessages=max_num_messages,
        VisibilityTimeout=100,
        WaitTimeSeconds=20,
    )
    if "Messages" not in response:
        return None, True
    messages = [message["Body"] for message in response["Messages"]]
    receipt_handles = [message["ReceiptHandle"] for message in response["Messages"]]
    print(f"{len(messages)} msgs received")
    for receipt_handle in receipt_handles:
        sqs.delete_message(QueueUrl=receive_queue, ReceiptHandle=receipt_handle)
        receipt_handles.remove(receipt_handle)
    print(f"{len(receipt_handles)} msgs deleted")
    return messages, False


def send_to_queue(queue_url, data, message_group_id):
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(data),
        MessageGroupId=message_group_id,
    )
    return response


# 10 msgs created
for i in range(10):
    sent_response = send_to_queue(
        receive_queue, {"key": i}, message_group_id=os.getenv("MESSAGE_GROUP_ID")
    )

# 10 msgs received
receive(str(uuid.uuid4()), 10)


# Now still 5 msgs are "inflight" and received again

队列配置

queue

我检查了之前的答案,要求我更改我设置的非零超时。

python amazon-web-services amazon-sqs
1个回答
0
投票

我尝试了你的代码,发现该程序执行以下操作:

  • 发送 10 条消息
  • 最多请求 10 条消息
  • 只收到5条消息

这是预期的行为。 receive_message() 的文档说:

短轮询是默认行为,其中在 ReceiveMessage 调用上对一组加权随机机器进行采样。因此,仅返回采样机器上的消息。

如果队列中的消息数量很少(少于 1,000 条),则每次 ReceiveMessage 调用您收到的消息很可能少于您请求的消息数量。如果队列中的消息数量极少,您可能不会收到任何消息特定的 ReceiveMessage 响应。如果发生这种情况,请重复请求。

这是 Amazon SQS 的分布式特性的结果,它允许其扩展以处理大量请求。

SQS architecture

消息存储在分布式服务器上,以实现冗余、高可用性和高性能。当发出

receive_message()

 调用时,请求会发送到一台服务器,而该服务器可能只有 
消息子集 可用。后续调用将返回所有消息,直到没有剩余消息为止。

如果队列中的消息数量较大,则您访问的服务器很可能会有 10 条以上消息,因此您不会注意到它只有一部分消息。

© www.soinside.com 2019 - 2024. All rights reserved.