我有一个看起来像这样的架构:
精英的任务使用dockerized
Python的容器,并使用boto3
SQS client来检索和分析SQS消息:
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
while sqs_message is not None:
# Process it
# Delete if from the queue
# Get next message in queue
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
def get_sqs_task_data(queue_url):
client = boto3.client('sqs')
response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
return response
def parse_sqs_message(response_sqs_message):
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None
# ... parse it and return a dict
return {
data_1 = ...,
data_2 = ...
}
总而言之,非常简单。
在get_sqs_data()
,我明确地指定我只想检索一个消息(因为1个ECS任务必须只能处理一个消息)。在parse_sqs_message()
,我测试是否有留在队列中有一些消息
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None
当在队列中只有一个消息(意思是一个精英任务已经触发),一切工作正常。精英的任务是能够挑选消息,并处理并将其删除。
然而,当队列中填充的同时X消息(X > 1
),X ECS任务被触发,但只有精英的任务是能够获取信息的一个并进行处理。
所有的人ECS任务将与No messages found in queue
退出,虽然有离开X - 1
消息进行处理。
这是为什么?为什么其他的任务无法领取离开的消息被拾起?
如果该事项,SQS的VisibilityTimeout
设置为30分钟。
任何帮助将大大感激!
随意问更多的精度,如果你想这样。
我不知道,了解任务是如何从SQS触发的,但是从我的SQS SDK文档中了解,如果使用轮询短消息时的数量少,这可能发生。从get_sqs_task_data
定义,我看到你正在使用短投票。
Short poll is the default behavior where a weighted random set of machines
is sampled on a ReceiveMessage call. Thus, only the messages on the
sampled machines are returned. If the number of messages in the queue
is small (fewer than 1,000), you most likely get fewer messages than you requested
per ReceiveMessage call.
If the number of messages in the queue is extremely small, you might not receive any messages in a particular ReceiveMessage response.
If this happens, repeat the request.
你可能想尝试使用长轮询具有超强的可见性超时值
我希望它能帮助