芹菜和定制消费者

问题描述 投票:0回答:4

据我所知,Celery 既充当消息的生产者又充当消息的消费者。这不是我想要达到的目标。我希望 Celery 仅充当消费者,根据我发送到我选择的 AMQP 代理的消息来触发某些任务。这可能吗?

或者我需要在我的堆栈中添加胡萝卜来煮汤吗?

python rabbitmq celery amqp
4个回答
6
投票

Celery 经纪人充当消息存储并将其发布给订阅这些消息的一个或多个工作人员,

所以:celery 将消息推送到代理(rabbitmq、redist、celery 本身通过 django db 等)。这些消息由工作人员按照代理协议检索,并记住它们(通常它们是持久的,但可能取决于在你的经纪人身上),并被你们工人处决了。

任务结果在正在执行的辅助任务上可用,您可以配置存储这些结果的位置,并且可以使用此方法检索它们。

您可以使用 celery 将参数传递给您的“接收器函数”来发布任务(您定义的任务,文档中有一些示例,通常您不想在这里传递大的东西(比如查询集),而只想传递最少的信息这允许您在执行任务时检索所需的内容。

一个简单的例子是:

您注册了一个任务

@task
def add(x,x):
    return x+y

然后您从另一个模块调用:

from mytasks import add

metadata1 = 1
metadata2 = 2
myasyncresult = add.delay(1,2)
myasyncresult.get() == 3

编辑

编辑后,我看到您可能想从芹菜以外的其他来源构建消息,您可以看到here消息格式,它们默认为遵循该格式的腌制对象,因此您将这些消息发布到正确的队列中你的rabbitmq经纪人和你从你的工人那里取回它们是正确的。


1
投票

Celery 使用消息代理架构模式。许多实现/代理传输都可以与 Celery 一起使用,包括 RabbitMQDjango 数据库

来自维基百科

消息代理是一种用于消息验证、消息转换和消息路由的架构模式。它调解应用程序之间的通信,最大限度地减少应用程序之间为了能够交换消息而应该相互了解的程度,从而有效地实现解耦。

保存结果是可选的,并且需要结果后端。您可以使用不同的代理和结果后端。 Celery 入门 指南包含更多信息。

您问题的答案是,您可以触发传递参数的特定任务,而无需添加Carrot


1
投票

Celery Custom Consumer将是3.1v中发布的功能,目前正在开发中,您可以阅读https://docs.celeryq.dev/en/stable/userguide/extending.html了解它。


0
投票

为了使用来自 celery 的消息,您需要创建 celery 可以使用的消息。您可以按如下方式创建 celery 消息:-

def get_celery_worker_message(task_name,args,kwargs,routing_key,id,exchange=None,exchange_type=None):
    
    message=(args, kwargs, None)

    application_headers={
        'lang': 'py',
        'task': task_name,
        'id':id,
        'argsrepr': repr(args),
        'kwargsrepr': repr(kwargs)
        #, 'origin': '@'.join([os.getpid(), socket.gethostname()])
    }
    properties={
        'correlation_id':id,
        'content_type': 'application/json',
        'content_encoding': 'utf-8',
    }


    body, content_type, content_encoding = prepare(
            message, 'json', 'application/json', 'utf-8',None, application_headers)

    prep_message = prepare_message(body,None,content_type,content_encoding,application_headers,properties)
    

    inplace_augment_message(prep_message, exchange, exchange_type, routing_key,id)

    # dump_json = json.dumps(prep_message)

    # print(f"json encoder:- {dump_json}")

    return prep_message

您需要首先根据消费者定义序列化器、content_type、content_encoding、压缩、标头来准备消息。

def prepare( body, serializer=None, content_type=None,
                 content_encoding=None, compression=None, headers=None):

        # No content_type? Then we're serializing the data internally.
        if not content_type:
            serializer = serializer
            (content_type, content_encoding,
             body) = dumps(body, serializer=serializer)
        else:
            # If the programmer doesn't want us to serialize,
            # make sure content_encoding is set.
            if isinstance(body, str):
                if not content_encoding:
                    content_encoding = 'utf-8'
                body = body.encode(content_encoding)

            # If they passed in a string, we can't know anything
            # about it. So assume it's binary data.
            elif not content_encoding:
                content_encoding = 'binary'

        if compression:
            body, headers['compression'] = compress(body, compression)

        return body, content_type, content_encoding

def prepare_message( body, priority=None, content_type=None,
                        content_encoding=None, headers=None, properties=None):
        """Prepare message data."""
        properties = properties or {}
        properties.setdefault('delivery_info', {})
        properties.setdefault('priority', priority )

        return {'body': body,
                'content-encoding': content_encoding,
                'content-type': content_type,
                'headers': headers or {},
                'properties': properties or {}}

创建消息后,您需要添加参数以使其可供 celery 消费者读取。

def inplace_augment_message(message, exchange,exchange_type, routing_key,next_delivery_tag):
    body_encoding_64 = 'base64'

    message['body'], body_encoding = encode_body(
        str(json.dumps(message['body'])), body_encoding_64
    )
    props = message['properties']
    props.update(
        body_encoding=body_encoding,
        delivery_tag=next_delivery_tag,
    )
    if exchange and exchange_type:
        props['delivery_info'].update(
            exchange=exchange,
            exchange_type=exchange_type,
            routing_key=routing_key,
        )
    elif exchange:
        props['delivery_info'].update(
            exchange=exchange,
            routing_key=routing_key,
        )
    else:
        props['delivery_info'].update(
            exchange=None,
            routing_key=routing_key,
        )

class Base64:

    """Base64 codec."""

    def encode(self, s):
        return bytes_to_str(base64.b64encode(str_to_bytes(s)))

    def decode(self, s):
        return base64.b64decode(str_to_bytes(s))

def encode_body( body, encoding=None):
    codecs = {'base64': Base64()}
    if encoding:
        return codecs.get(encoding).encode(body), encoding
    return body, encoding
© www.soinside.com 2019 - 2024. All rights reserved.