我有一个简单的任务需要第三方。 当请求到来时,我将其推送到
amazon sqs
队列,将其拉入工作人员并调用 3rd party
。如果超时,我想实现指数退避(在 2 秒内重试,然后是 4 秒,然后是 8 秒,然后......)
最大重试次数。
使用
python
、boto -> sqs
我一直在寻找内置参数,以允许我用尽可能少的代码来完成此操作(理想情况下,根本不需要代码)。
类似的东西
from boto import sqs
def handle_message(message):
try:
# send a post to api
except TimeOut, err:
# send_back_to_itself in 2/4/8 sec
if delay < DELAY_LIMIT:
queue.write(message, delay=secs)
我认为从 2021 年夏季开始,“完全没有代码”是不可能的。但是有一篇很棒的博客文章介绍了如何通过编码示例来做到这一点https://aws.amazon.com/blogs/compute/using -amazon-sqs-死信队列至重播消息/
上面链接中的代码示例:
def handler(event, context):
"""Lambda function handler."""
for record in event['Records']:
nbReplay = 0
# number of replay
if 'sqs-dlq-replay-nb' in record['messageAttributes']:
nbReplay = int(record['messageAttributes']['sqs-dlq-replay-nb']["stringValue"])
nbReplay += 1
if nbReplay > config.MAX_ATTEMPS:
raise MaxAttempsError(replay=nbReplay, max=config.MAX_ATTEMPS)
# SQS attributes
attributes = record['messageAttributes']
attributes.update({'sqs-dlq-replay-nb': {'StringValue': str(nbReplay), 'DataType': 'Number'}})
_sqs_attributes_cleaner(attributes)
# Backoff
b = backoff.ExpoBackoffFullJitter(base=config.BACKOFF_RATE, cap=config.MESSAGE_RETENTION_PERIOD)
delaySeconds = b.backoff(n=int(nbReplay))
# SQS
SQS.send_message(
QueueUrl=config.SQS_MAIN_URL,
MessageBody=record['body'],
DelaySeconds=int(delaySeconds),
MessageAttributes=record['messageAttributes']
)
根据 AWS SDK 文档,如果您使用官方 SDK 库之一,您可以免费获得指数退避。所以看来你需要做的就是设置你的
max_attempts
计数,第一次尝试之后的所有事情都会呈指数级下降:
import boto3
from botocore.config import Config
config = Config(
retries = dict(
max_attempts = 10 # docs say default is 5
)
)
sqs = boto3.client('sqs', config=config)
sqs.receive_message(QueueUrl='https://sqs.us-east-1.amazonaws.com/<your-account-num>/foobar')