在celery中,当任务入队时,将上下文元数据从发送者进程传递给worker的适当方法是什么?

问题描述 投票:3回答:1

当任何celery任务入队时,我想添加工作人员能够使用的上下文元数据。

以下代码示例有效,但我希望有一个合适的芹菜式解决方案。

from celery.signals import before_task_publish, task_prerun

@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
    task_kwags = body[1]
    metadata = {"foo": "bar"}
    task_kwags['__metadata__'] = metadata

@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
    metadata = kwargs['kwargs'].pop('__metadata__', {})
    # metadata == {"foo": "bar"}
celery django-celery celery-task
1个回答
3
投票

当一个任务在工人中开始时,before_task_publishheader的内容位于**kwargspush_request中。

celery/app/tasks.py:1000

    def push_request(self, *args, **kwargs):
        self.request_stack.push(Context(*args, **kwargs))

Context的构造函数中完成了一些不错的事情。 self.__dict__.update()意味着我们可以像Context(metadata={'foo': 'bar'}).metadata一样访问这些值

celery/app/tasks.py:99

class Context(object)
# ...
    def __init__(self, *args, **kwargs):
        self.update(*args, **kwargs)

    def update(self, *args, **kwargs):
        return self.__dict__.update(*args, **kwargs)

可以从Taskrequest财产访问任务上下文。

celery/app/tasks.py:1019

class Task(object):
# ...
    def _get_request(self):
        """Get current request object."""
        req = self.request_stack.top
        if req is None:
            # task was not called, but some may still expect a request
            # to be there, perhaps that should be deprecated.
            if self._default_request is None:
                self._default_request = Context()
            return self._default_request
        return req
    request = property(_get_request)

这意味着最终的解决方案就是:

from celery.signals import before_task_publish, task_prerun

@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
    metadata = {"foo": "bar"}
    headers['__metadata__'] = metadata

@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
    metadata = getattr(task.request, '__metadata__', {}) 
    # metadata == {"foo": "bar"}

注意:task.request.__metadata__也可以工作,但如果任务在信号集成之前排队,它就会失败。这样更安全。

© www.soinside.com 2019 - 2024. All rights reserved.