将 celery 任务的结果链接到分布式组中

问题描述 投票:0回答:3

就像这个其他问题一样,我想从芹菜任务返回的列表中创建一个芹菜组。这个想法是第一个任务将返回一个列表,第二个任务将该列表分解为列表中每个项目的并发任务。

计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是下载页面、处理它然后上传到 s3 的链。最后,完成所有子页面后,该网站将在我们的数据库中标记为已完成。像这样的东西:

chain(
    get_links_from_website.si('https://www.google.com'),
    dmap.s(  # <-- Distributed map
        download_sub_page.s() | 
        process_sub_page.s() | 
        upload_sub_page_to_s3.s()
    ),
    mark_website_done.s()
)

到目前为止我看到的解决方案似乎做得很好,但是当第二个任务是链时失败,因为

clone
没有做深度复制的问题(有关详细信息,请参阅对此答案的评论 ):

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

还有一个问题是,如果可迭代对象的长度为 10,000 个项目,它将创建一个包含 10,000 个项目的组。正如您所想象的那样,这正在炸毁我们的内存使用量。

所以,我正在寻找的是一种方法来做

dmap

  • 不会通过创建巨大的组来炸毁 RAM(也许有一种方法可以对可迭代对象进行分块?)
  • 在 celery 链上工作,没有 deepcopy 问题。
python celery chain
3个回答
4
投票

celery canvas 提供了 chunks 来将任务分成块。不幸的是,这不适用于链、组等原语。

您可以使用芹菜信号来防止 dmap/clone 出现问题。

ch = chain(
    download_sub_page.s(),
    process_sub_page.s(),
    upload_sub_page.s(),
)

@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
    result = kwargs['result']    
    header = [ch(i) for i in result]
    callback = mark_website_done.si()
    chord(header)(callback)

创建处理页面的链,并使用弦将最后一个任务挂接到它。只要

get_links_from_website
成功运行,就会执行此函数。

根据连锁所花费的时间,您还可以将

get_links_from_website
的结果保存在某处。然后迭代其中的一批以排队链,并且对于最后一批,您可以挂钩最后一个任务的回调。


1
投票

这有点 hacky 但我们正在使用 deepcopy 来克隆回调,这修复了 Signature 的浅拷贝的错误

def dmap(it, callback, final=None):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)

    run_in_parallel = group(subtask(copy.deepcopy(dict(callback))).clone([arg, ]) for arg in it)

    if len(run_in_parallel.tasks) == 0:
        return []

    if final:
        return chord(run_in_parallel)(final)

    return run_in_parallel.delay()

请注意,这仅适用于一个嵌套级别(即回调是一个链/组/弦),但不适用于深层嵌套的回调

对于深度嵌套的回调图,我们使用这个 hack,它有点慢但工作完美

# Hack to completely clone a signature with possibly complex subtasks (chains, chords, etc...)
run_in_parallel = group(pickle.loads(pickle.dumps(callback)).clone([arg, ]) for arg in it)

对于组的大小,您总是可以将迭代器拆分为块


0
投票

如果有人遇到这个问题,Jether 的回答很有帮助,但并不完美。对我们来说,存在三个问题:

  1. 如果
    callback
    本身是一条链,则答案不会将参数传递到链上。 https://stackoverflow.com/a/59023231/19882725 通过
    clone_signature
    帮助提供解决方案。这似乎适用于使用 RabbitMQ 作为代理的合理嵌套链,但我们没有尝试任何极端的东西(因此不需要调整它来使用
    pickle
    )。
  2. 如果
    callback
    是一个组或和弦,我们需要将参数应用于每个克隆的任务,因此我们修改了 (1) 中的
    clone_signature
    以适应这种情况。
  3. 添加 (1) 后,传递
    final
    失败了 - 我们采用了 https://github.com/celery/celery/issues/5265 的解决方案,将 final 从
    dict
    转换为
    Signature
    .
  4. 最后,我们发现
    final
    在很多情况下实际上不会执行,因为
    chord
    收到的是
    Group
    而不是任务列表。

对于任何好奇的人,这是我们的最终解决方案:

import copy

from celery import Signature, chord, group, shared_task, subtask


def clone_signature(sig, args=(), kwargs=(), **opts):
    """
    Turns out that a chain clone() does not copy the arguments properly - this
    clone does.
    From: https://stackoverflow.com/a/53442344/3189
    """
    if sig.subtask_type and sig.subtask_type not in ["chain", "group", "chord"]:
        raise NotImplementedError(
            "Cloning only supported for tasks, chains, groups, and chords, not {}".format(
                sig.subtask_type
            )
        )
    clone = sig.clone()
    # if the task we're cloning is a group or chord, apply the arguments to each of the children
    if sig.subtask_type and sig.subtask_type in ["group", "chord"]:
        clone.tasks = [
            clone_signature(task, args=args, kwargs=kwargs, opts=opts)
            for task in clone.tasks
        ]
    # otherwise, apply the arguments to either the task itself (if it's a single task)
    # or the first child task (if it's a chain)
    else:
        if hasattr(clone, "tasks"):
            task_to_apply_args_to = clone.tasks[0]
        else:
            task_to_apply_args_to = clone
        args, kwargs, opts = task_to_apply_args_to._merge(
            args=args, kwargs=kwargs, options=opts
        )
        task_to_apply_args_to.update(
            args=args, kwargs=kwargs, options=copy.deepcopy(opts)
        )
    return clone


@shared_task
def dmap(it, callback, final=None):
    if not len(it):
        return []

    callback = subtask(callback)
    run_in_parallel = [
        clone_signature(callback, args if type(args) is list else [args]) for args in it
    ]

    if not final:
        return group(*run_in_parallel).delay()

    # see https://github.com/celery/celery/issues/5265
    if not isinstance(final, Signature):
        final["immutable"] = True
        final = Signature.from_dict(final)
    return chord(run_in_parallel)(final)

这使我们能够成功执行如下嵌套的

dmap
s:

chain(
    taskA.s(),
    dmap.s(
        chain(
            taskB.s(),
            taskC.s(),
            dmap.s(
                taskD.s(),
                final=chain(
                    taskE.s(),
                    taskF.s(),
                ),
            ),
        ),
    ),
).delay()
© www.soinside.com 2019 - 2024. All rights reserved.