Celery Redis后端 - 将队列中的任务作为项目存在

问题描述 投票:0回答:2

当前设置:芹菜在EC2节点上运行在docker容器上(带有我们产品的代码),创建和处理任务。我们的后端/经纪人是Redis,在AWS的弹性运行中运行。

目标:能够在任何给定时间查看队列大小(类似于花的监控),希望通过AWS CloudWatch,但不需要。任务的内容并不重要,因为我熟悉备份redis实例,并且可以使用本地工具解析备份以进行所需的任何分析。短期历史数据是首选(CloudWatch可以追溯到2周,并且具有1分钟数据点的粒度,这非常好)。

根据我对Flower的工作原理,由于我们目前拥有的安全组/限制数量,Flower无法使用。此外,只有当您在页面上时才会监控花,因此不会保存历史数据。

Elasticache已经内置了CloudWatch,用于redis中的项目数。在我看来,这是实现目标的最佳途径。但是,当前队列表示redis中的一个项目(无论队列中有多少任务)。以下是解析为json的redis备份示例:

[{
"1.api_data__cached_api_route.000":"{\"i1\": 0, \"i2\": 1, \"i3\": 0}",
"1.api_data__cached_api_route.001":"{\"i1\": 0, \"i2\": 0, \"i3\": 0}",
"1.api_data__cached_api_route.002":"{\"i1\": 1, \"i2\": 1, \"i3\": 0}",
"staging_event_default":["{\"id\":\"b28b056c-1268-4577-af8a-9f1948860502\", \"task\":{...}}, "{\"id\":\"52668c46-3972-457a-be3a-6e27eedd26e3\", \"task\":{...}}]
}]

Cloudwatch将此视为4个项目,3个缓存的api路由和1个队列。即使队列中有数千个项目,它仍然会显示为4个项目。 #(队列中的项目)和#(队列中的项目和其他缓存项目)之间的差异很好,因为这个监视工具将主要用于查看队列是否得到可怕的备份,并且队列的大小将相形见绌缓存的api路由数量。

要继续这条路线,最简单的答案是,如果芹菜有一个配置选项,让队列中的每个项目都是自己的redis项目。如果有一个简单的修复或配置选项,它似乎是最容易实现的。以下是我们当前的芹菜配置选项:

flask_app.config.update(
  CELERY_BROKER_URL=redis_host,
  CELERY_RESULT_BACKEND=redis_host,
  CELERY_QUEUES=queue_manager.queues,
  CELERY_ROUTES=queue_manager.routes,
  CELERY_DEFAULT_QUEUE=queue_manager.default_queue_name,
  CELERY_DEFAULT_EXCHANGE=queue_manager.default_exchange_name)

_celery = celery.Celery(flask_app.import_name,
  backend=flask_app.config['CELERY_RESULT_BACKEND'],
  broker=flask_app.config['CELERY_BROKER_URL'])

opts = {
  'broker_url': redis_host,
  'result_backed': redis_host,
  'task_queues': queue_manager.queues,
  'task_routes': queue_manager.routes,
  'task_default_queue': queue_manager.default_queue_name,
  'task_default_exchange': queue_manager.default_exchange_name,
  'worker_send_task_events': True,

  'task_ignore_result': True,
  'task_store_errors_even_if_ignored': True,
  'result_expires': 3600,

  'worker_redirect_stdouts': False,
  'beat_scheduler': redbeat.RedBeatScheduler,
  'beat_max_loop_interval': 5
}
_celery.conf.update(opts)

我遇到的另一个选择是celery-cloudwatch-logs,它似乎与我想要达到的目标一致,但似乎更多的目的是看到每个任务的特定内容,而不是总体上(但我可能是错了)。

如果没有完美/简单的解决方案可以达到目标,我会考虑分割芹菜云计算器来上传相关信息。我们的团队继承了目前存在的大部分代码,我对芹菜的工作方式有了基本的了解,但绝不是深入的。

提前感谢任何人的想法,意见和帮助!

amazon-web-services redis celery amazon-cloudwatch amazon-elasticache
2个回答
1
投票

如果有人碰巧碰到它,我会发布我在这里所做的事情。

我们已经安装了boto3并为应用程序中的其他位置配置了S3访问权限,因此很容易发布到CloudWatch。

我在Celery类中添加了一个方法,使用redis模块中的llen检查队列的长度:

 @staticmethod
  def check_lengths():
    result = {}
    for q in Celery._queues:
      result[q] = Celery._redis.llen(q)
    return result

然后发布到Cloudwatch也相当容易:

    namespace = "Celery/Queue"
    metrics = []
    for qname, qlen in data.items():
      metric = {}
      metric["MetricName"] = "ItemsInQueue"
      metric["Dimensions"] = [ {"Name": "QueueName", "Value": qname} ]
      metric["Value"] = qlen
      metric["Unit"] = "Count"

      metrics.append(metric)

    self.cw_client.put_metric_data(Namespace=namespace, MetricData=metrics)

然后,我最终使用AWS Lambda将分钟上的网络请求发送到端点,然后端点将上述数据发布到CloudWatch。


0
投票

要使用redis代理查看队列的队列长度,只需在redis中使用llen。例如,llen celery

© www.soinside.com 2019 - 2024. All rights reserved.