避免 celery Broker 中的重复任务

问题描述 投票:0回答:5

我想使用 celery 配置 pi 创建以下流程:

  • 仅当 celery 队列没有已挂起的 TaskA(argB) 时才发送 TaskA(argB)

可能吗?怎么办?

celery
5个回答
9
投票

您可以通过某种记忆方式让您的工作了解其他任务。如果您使用缓存控制键(redis,memcached,/tmp,任何方便的),您可以使执行依赖于该键。我以 redis 为例。

from redis import Redis

@app.task
def run_only_one_instance(params):
    try:
        sentinel =  Redis().incr("run_only_one_instance_sentinel")
        if sentinel == 1:
            #I am the legitimate running task
            perform_task()
        else:
            #Do you want to do something else on task duplicate?
            pass
        Redis().decr("run_only_one_instance_sentinel")
    except Exception as e:
        Redis().decr("run_only_one_instance_sentinel")
        # potentially log error with Sentry?
        # decrement the counter to insure tasks can run
        # or: raise e

1
投票

我想不出什么办法,只能

  1. 通过

    celery inspect

  2. 检索所有正在执行和计划的任务
  3. 迭代它们以查看您的任务是否存在。

检查thisSO问题,看看第一点是如何完成的。

祝你好运


1
投票

我的答案是假设

  1. 同一个作业运行多次
  2. 原因可能是无法确认消息
  3. 消费者不知何故断开连接,未确认的消息已准备好

解决方案

  1. 首先将
    bind=True
    传递给任务,以便我们可以获取任务 uuid 为
    self.request.id
from redis import Redis

@app.task(bind=True)
def your_task(self, *args, **kwargs):
  task_id = self.request.id
  # check if already executed 
  redis_conn = Redis.from_url(redis_url, charset="utf-8", decode_responses=True)
  if redis_conn.get(task_id):
    logger.info(f"{task_id} is a duplicate job")
    raise Exception('Duplicate job') # Handle this accordingly

  # expire in 1 day
  ex_1_day = 24*60*60
  redis_conn.set(task_id, 1, ex=ex_1_day)
  # From here run your job now
  # ...

这样,它将确保您的作业永远不会并行运行,也不会重复顺序运行


1
投票

我不知道它会比其他答案对你有更多帮助,但我的方法是遵循 srj 给出的相同想法。我需要一种方法来阻止我的服务器启动具有相同 id 的任务来排队。所以我做了一个通用函数来帮助我。

    def is_task_active_or_registered(app, task_id):

        i = app.control.inspect()

        active_dict = i.active()
        scheduled_dict = i.scheduled()
        keys_set = set(active_dict.keys() + scheduled_dict.keys())
        tasks_ids_set = set()

        for _dict in [active_dict, scheduled_dict]:
            for k in keys_set:
                for task in _dict[k]:
                    tasks_ids_set.add(task['id'])

        if task_id in tasks_ids_set:
            return True
        else:
            return False

所以,我这样使用它:

在我的 celery-app 对象可用的上下文中,我定义:

    def check_task_can_not_run(task_id):
        return is_task_active_or_registered(app=celery, task_id=task_id)

因此,根据我的客户请求,我将此称为

check_task_can_not_run(...)
并阻止任务在
True
的情况下启动。


0
投票

我也面临着类似的问题。 The Beat 在我的队列中重复播放。我想使用

expires
但此功能无法正常工作 https://github.com/celery/celery/issues/4300.

这里是调度程序,它检查任务是否已经排队(基于任务名称)。

# -*- coding: UTF-8 -*-
from __future__ import unicode_literals

import json
from heapq import heappop, heappush

from celery.beat import event_t
from celery.schedules import schedstate
from django_celery_beat.schedulers import DatabaseScheduler
from typing import List, Optional
from typing import TYPE_CHECKING

from your_project import celery_app

if TYPE_CHECKING:
    from celery.beat import ScheduleEntry


def is_task_in_queue(task, queue_name=None):
    # type: (str, Optional[str]) -> bool
    queues = [queue_name] if queue_name else celery_app.amqp.queues.keys()

    for queue in queues:
        if task in get_celery_queue_tasks(queue):
            return True
    return False


def get_celery_queue_tasks(queue_name):
    # type: (str) -> List[str]
    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        task = j['headers']['task']
        if task not in decoded_tasks:
            decoded_tasks.append(task)

    return decoded_tasks


class SmartScheduler(DatabaseScheduler):
    """
    Smart means that prevents duplicating of tasks in queues.
    """
    def is_due(self, entry):
        # type: (ScheduleEntry) -> schedstate
        is_due, next_time_to_run = entry.is_due()

        if (
            not is_due or  # duplicate wouldn't be created
            not is_task_in_queue(entry.task)  # not in queue so let it run
        ):
            return schedstate(is_due, next_time_to_run)

        # Task should be run (is_due) and it is present in queue (is_task_in_queue)

        H = self._heap
        if not H:
            return schedstate(False, self.max_interval)

        event = H[0]
        verify = heappop(H)
        if verify is event:
            next_entry = self.reserve(entry)
            heappush(H, event_t(self._when(next_entry, next_time_to_run), event[1], next_entry))
        else:
            heappush(H, verify)
            next_time_to_run = min(verify[0], next_time_to_run)

        return schedstate(False, min(next_time_to_run, self.max_interval))
© www.soinside.com 2019 - 2024. All rights reserved.