如何在celery中指定SQS队列名称

问题描述 投票:0回答:5

我需要用

redis
经纪人替换我的
SQS
经纪人,在谷歌搜索时我发现了很多页面,告诉我如何将
SQS
celery
一起使用。根据我的理解,它创建自己的 SQS 队列,我只有一项任务,想要使用已经创建的 SQS 队列。

python asynchronous celery amazon-sqs celery-task
5个回答
8
投票

默认情况下,celery 将使用“队列前缀”设置(如果已定义)为您创建一个新队列。 但是,如果您想使用现有队列,则可以通过

task-default-queue

 设置提供名称。在这种情况下,请确保您没有定义上面提到的队列前缀。


2
投票
broker_transport_options

(在 celery 4.0 中)设置队列名称,例如:


broker_transport_options = {"queue_name_prefix": "my-queue-"}

文档在
这里


2
投票

提交增加了使用预定义队列的能力。 您应该能够通过将预定义队列选项添加到 CELERY_BROKER_TRANSPORT_OPTIONS 来使用预定义队列

CELERY_BROKER_TRANSPORT_OPTIONS={ 'predefined_queues':{ 'HIGH_PRIORITY': { 'url': 'https://sqs.ap-south-1.amazonaws.com/030221/HGH_PRIORITY', 'access_key_id': config('AWS_ACCESS_KEY'), 'secret_access_key': config('AWS_SECRET_KEY'), }, } }

以下是提交后的文档更新 -

Other Features supported by this transport: Predefined Queues: The default behavior of this transport is to use a single AWS credential pair in order to manage all SQS queues (e.g. listing queues, creating queues, polling queues, deleting messages). If it is preferable for your environment to use a single AWS credential, you can use the 'predefined_queues' setting inside the 'transport_options' map. This setting allows you to specify the SQS queue URL and AWS credentials for each of your queues. For example, if you have two queues which both already exist in AWS) you can tell this transport about them as follows: transport_options = { 'predefined_queues': { 'queue-1': { 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa', 'access_key_id': 'a', 'secret_access_key': 'b', }, 'queue-2': { 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb', 'access_key_id': 'c', 'secret_access_key': 'd', }, } }



1
投票

broker_url = f"sqs://{aws_access_key}:{aws_secret_key}@" result_backend = 'file://results' # Or whatever option but 'rpc' task_default_queue = "my_super_queue" broker_transport_options = { 'visibility_timeout': 100, # YOU DECIDE THIS NUMBER 'region': 'us-west-2', # DON'T FORGET THIS }

请记住输入您登录用户的凭据。
还要记住向该用户授予正确的权限(在我的例子中,我给了它 AmazonSQSFullAccess)

通过提供凭据(访问权限和密钥),您无需在

broker_url

中指定任何 url。这是因为当使用给定的凭据连接时,您可以访问 SQS 队列的列表。它将尝试使用task_default_queue中指定的现有队列,如果找不到它,它将创建它。 我没有在此处指定

queue_name_prefix

的值(在 broker_transport_options 内),但如果您这样做,要使用(或创建)的队列的最终名称将是 queue_name_prefix 和后跟 的串联任务默认队列 考虑一下,如果创建的队列是

SQS FIFO

队列,则它必须以“.fifo”结尾,因此在本例中它将是 my_super_queue.fifo


0
投票

from celery import Celery def make_celery(app): celery = Celery( app.import_name, broker="sqs://", broker_transport_options={ "queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-" }, ) task_base = celery.Task class ContextTask(task_base): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return task_base.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery

© www.soinside.com 2019 - 2024. All rights reserved.