我创建了一个 FIFO 队列。 我发送了 10 条消息,收到它们并删除它们。但它们仍然在队列中并再次收到。我最初收到 10 条消息,然后是 5 条,然后是 2 条,然后是 0 条。“delete_message”响应也是 200。
代码:
import json
import os
import uuid
import boto3
from dotenv import load_dotenv
load_dotenv()
sqs = boto3.client("sqs", os.getenv("AWS_REGION"))
receive_queue = os.getenv("RECEIVE_QUEUE_URL")
def receive(attempt_id, max_num_messages):
response = sqs.receive_message(
QueueUrl=receive_queue,
ReceiveRequestAttemptId=attempt_id,
MaxNumberOfMessages=max_num_messages,
VisibilityTimeout=100,
WaitTimeSeconds=20,
)
if "Messages" not in response:
return None, True
messages = [message["Body"] for message in response["Messages"]]
receipt_handles = [message["ReceiptHandle"] for message in response["Messages"]]
print(f"{len(messages)} msgs received")
for receipt_handle in receipt_handles:
sqs.delete_message(QueueUrl=receive_queue, ReceiptHandle=receipt_handle)
receipt_handles.remove(receipt_handle)
print(f"{len(receipt_handles)} msgs deleted")
return messages, False
def send_to_queue(queue_url, data, message_group_id):
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(data),
MessageGroupId=message_group_id,
)
return response
# 10 msgs created
for i in range(10):
sent_response = send_to_queue(
receive_queue, {"key": i}, message_group_id=os.getenv("MESSAGE_GROUP_ID")
)
# 10 msgs received
receive(str(uuid.uuid4()), 10)
# Now still 5 msgs are "inflight" and received again
队列配置
我检查了之前的答案,要求我更改我设置的非零超时。
我尝试了你的代码,发现该程序执行以下操作:
这是预期的行为。 receive_message() 的文档说:
短轮询是默认行为,其中在 ReceiveMessage 调用上对一组加权随机机器进行采样。因此,仅返回采样机器上的消息。这是 Amazon SQS 的分布式特性的结果,它允许其扩展以处理大量请求。如果队列中的消息数量很少(少于 1,000 条),则每次 ReceiveMessage 调用您收到的消息很可能少于您请求的消息数量。如果队列中的消息数量极少,您可能不会收到任何消息特定的 ReceiveMessage 响应。如果发生这种情况,请重复请求。
消息存储在分布式服务器上,以实现冗余、高可用性和高性能。当发出
receive_message()
调用时,请求会发送到一台服务器,而该服务器可能只有消息子集 可用。后续调用将返回所有消息,直到没有剩余消息为止。 如果队列中的消息数量较大,则您访问的服务器很可能会有 10 条以上消息,因此您不会注意到它只有一部分消息。