Celery、RabbitMQ 在执行任务时从消费者列表中删除工人

问题描述 投票:0回答:1

我已经启动了我的 celery Worker,它使用 RabbitMQ 作为代理,如下所示:

celery -A my_app worker -l info -P gevent -c 100 --prefetch-multiplier=1 -Q my_app

然后我的任务看起来很像这样:

@shared_task(queue='my_app', default_retry_delay=10, max_retries=1, time_limit=8 * 60)
def example_task():
    # getting queryset with some filtering
    my_models = MyModel.objects.filter(...)

    for my_model in my_models.iterator():
        my_model.execute_something()

有时此任务可以在不到一分钟的时间内完成,有时在高负载期间,需要超过 5 分钟才能完成。

主要问题是 RabbitMQ 不断地将我的工作人员从消费者列表中删除。看起来真的很随机。因此,我需要再次重新启动工作程序。

工作人员也开始抛出这些错误:

SSLEOFError: EOF occurred in violation of protocol (_ssl.c:2396)

有时会出现这些错误:

consumer: Cannot connect to amqps://my_app:**@example.com:5671/prod: SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:997)').

Couldn't ack 2057, reason:"RecoverableConnectionError(None, 'connection already closed', None, '')"

我尝试添加

--without-heartbeat
但没有任何作用。

如何解决这个问题?有时我的任务需要 30 多分钟才能完成,而且我无法持续监控工人是否被从rabbitmq踢出。

python django rabbitmq celery
1个回答
0
投票

对于将来遇到这些问题的任何人来说,这似乎是 RabbitMQ 的问题。我尝试了 Redis,问题就消失了。不过,我想坚持使用 RabbitMQ 进行生产,所以我尝试了 LavinMQ,这是一个替代品,问题就消失了。

© www.soinside.com 2019 - 2024. All rights reserved.