我是 AWS SQS 的新手。我正在尝试实现一个简单的服务器到节点 SQS 服务,其中多个节点向服务器发送一些信息(一些关于自身的标识),服务器需要将其放入列表中并迭代删除消息。我有以下代码:
sqs_client = boto3.client("sqs",aws_access_key_id=akey,aws_secret_access_key=skey,region_name=region)
#Queue URL
queue=r'https://ap-south-1.queue.amazonaws.com/blahblah/SQS_client_list'
response = sqs_client.receive_message(
QueueUrl=queue,
AttributeNames=['SentTimestamp'],
MaxNumberOfMessages=10,
MessageAttributeNames=['All'],
VisibilityTimeout=2,
WaitTimeSeconds=2
)
message=response.get("Messages")
#Information should go into this list
node_list=[]
#Function to delete messages
#I need to delete the message after passing into the list "node_list"
def get_node_list():
message_body = message[0]["Body"]
node_list.append(message_body)
sqs_client.delete_message(QueueUrl=queue, ReceiptHandle=response['Messages'][0]["ReceiptHandle"])
time.sleep(3)
return node_list
# Loop to delete all messages
while(True):
if len(response.get('Messages', [])) !=0:
time.sleep(3)
get_node_list()
if len(response.get('Messages', [])) ==0:
break
print(node_list)
问题是,当多个节点将其信息推送到同一个队列时,此代码会运行无限循环,并且 node_list 会无限次追加,而 while 循环不会删除 SQS 消息。代码中可能有什么问题?
您的代码应如下所示:
while(True):
# Get messages
response = sqs_client.receive_message(WaitTimeSeconds=20,...)
# Loop through messages
for message in response['Messages']:
# process message here
# Delete the message
sqs_client.delete_message(QueueUrl=queue, ReceiptHandle=message['ReceiptHandle'])
我假设您希望它能够不断处理新消息,因此外循环会永远循环。
拨打
receive_messages()
时,我建议您设置WaitTimeSeconds=20
。这意味着如果队列中没有消息,它将等待最多 20 秒才返回。然而,一旦出现消息,它就会返回,无需等待 20 秒。这减少了对 AWS 的 API 调用数量。
我注意到您指定了
MaxNumberOfMessages=10
,这意味着可能会收到多条消息。因此,代码需要循环遍历提供的每条消息。
处理完消息后,可以调用
delete_message()
。或者,您可以将所有消息句柄添加到列表中,并在内循环之后调用 delete_message_batch()
。
当 SQS 可见性超时小于 Lambda/消费者的处理时间时,就会发生这种情况。