我需要用
redis
经纪人替换我的 SQS
经纪人,在谷歌搜索时我发现了很多页面,告诉我如何将 SQS
与 celery
一起使用。根据我的理解,它创建自己的 SQS 队列,我只有一项任务,想要使用已经创建的 SQS 队列。
提交增加了使用预定义队列的能力。 您应该能够通过将预定义队列选项添加到 CELERY_BROKER_TRANSPORT_OPTIONS 来使用预定义队列
CELERY_BROKER_TRANSPORT_OPTIONS={
'predefined_queues':{
'HIGH_PRIORITY': {
'url': 'https://sqs.ap-south-1.amazonaws.com/030221/HGH_PRIORITY',
'access_key_id': config('AWS_ACCESS_KEY'),
'secret_access_key': config('AWS_SECRET_KEY'),
},
}
}
以下是提交后的文档更新 -
Other Features supported by this transport:
Predefined Queues:
The default behavior of this transport is to use a single AWS credential
pair in order to manage all SQS queues (e.g. listing queues, creating
queues, polling queues, deleting messages).
If it is preferable for your environment to use a single AWS credential, you
can use the 'predefined_queues' setting inside the 'transport_options' map.
This setting allows you to specify the SQS queue URL and AWS credentials for
each of your queues. For example, if you have two queues which both already
exist in AWS) you can tell this transport about them as follows:
transport_options = {
'predefined_queues': {
'queue-1': {
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa',
'access_key_id': 'a',
'secret_access_key': 'b',
},
'queue-2': {
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb',
'access_key_id': 'c',
'secret_access_key': 'd',
},
}
}
broker_url = f"sqs://{aws_access_key}:{aws_secret_key}@"
result_backend = 'file://results' # Or whatever option but 'rpc'
task_default_queue = "my_super_queue"
broker_transport_options = {
'visibility_timeout': 100, # YOU DECIDE THIS NUMBER
'region': 'us-west-2', # DON'T FORGET THIS
}
请记住输入您登录用户的凭据。 还要记住向该用户授予正确的权限(在我的例子中,我给了它 AmazonSQSFullAccess)
通过提供凭据(访问权限和密钥),您无需在
broker_url中指定任何 url。这是因为当使用给定的凭据连接时,您可以访问 SQS 队列的列表。它将尝试使用task_default_queue中指定的现有队列,如果找不到它,它将创建它。 我没有在此处指定
queue_name_prefix的值(在 broker_transport_options 内),但如果您这样做,要使用(或创建)的队列的最终名称将是 queue_name_prefix 和后跟 的串联任务默认队列。 考虑一下,如果创建的队列是
SQS FIFO队列,则它必须以“.fifo”结尾,因此在本例中它将是 my_super_queue.fifo
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker="sqs://",
broker_transport_options={
"queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
},
)
task_base = celery.Task
class ContextTask(task_base):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return task_base.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery