任何更改时 Celery 自动重新加载

问题描述 投票:0回答:9

CELERY_IMPORTS
中的
settings.py
中的模块发生更改时,我可以使 celery 自动重新加载。

我尝试让母模块检测子模块的更改,但它没有检测到子模块的更改。这让我明白检测不是由 celery 递归完成的。我在文档中搜索了它,但没有找到针对我的问题的任何回复。

将项目中所有与 celery 相关的部分添加到

CELERY_IMPORTS
来检测变化真的很困扰我。

有没有办法告诉celery“当项目的任何地方有任何变化时自动重新加载”。

谢谢您!

python celery django-celery
9个回答
38
投票

Celery

--autoreload
不起作用,它已被 弃用

既然你使用的是django,你可以为此编写一个管理命令。 Django 有 autoreload 实用程序,runserver 使用它在代码更改时重新启动 WSGI 服务器。

相同的功能可用于重新加载 celery 工作线程。创建一个名为 celery 的单独管理命令。编写一个函数来杀死现有的工作人员并启动一个新的工作人员。现在将此函数挂接到自动重新加载,如下所示。

import shlex
import subprocess

from django.core.management.base import BaseCommand
from django.utils import autoreload


def restart_celery():
    cmd = 'pkill celery'
    subprocess.call(shlex.split(cmd))
    cmd = 'celery worker -l info -A foo'
    subprocess.call(shlex.split(cmd))


class Command(BaseCommand):

    def handle(self, *args, **options):
        print('Starting celery worker with autoreload...')

        # For Django>=2.2
        autoreload.run_with_reloader(restart_celery) 

        # For django<2.1
        # autoreload.main(restart_celery)

现在您可以使用

python manage.py celery
运行 celery Worker,它将在代码库更改时自动重新加载。

这仅用于开发目的,请勿在生产中使用。代码取自我的其他答案


22
投票

您可以使用watchmedo

pip install watchdog

通过watchmedo间接启动celeryworker

watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery worker --app=worker.app --concurrency=1 --loglevel=INFO

更详细


18
投票

您可以使用

-I|--include
手动包含其他模块。将其与
find
awk
等 GNU 工具结合使用,您将能够找到所有
.py
文件并包含它们。

$ celery -A app worker --autoreload --include=$(find . -name "*.py" -type f | awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' | sed 's/.$//')

让我们解释一下:

find . -name "*.py" -type f

find
递归搜索包含
.py
的所有文件。输出看起来像这样:

./app.py
./some_package/foopy
./some_package/bar.py

然后:

awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=','

此行将

find
的输出作为输入,并删除所有出现的
./
。然后它将所有
/
替换为
.
。最后一个
sub()
删除并用空字符串替换
.py
ORS
将所有换行符替换为
,
。输出:

app,some_package.foo,some_package.bar,

最后一个命令

sed
删除最后一个
,

因此正在执行的命令如下所示:

$ celery -A app worker --autoreload --include=app,some_package.foo,some_package.bar

如果源代码中有

virtualenv
,您可以通过添加
-path .path_to_your_env -prune -o
:

来排除它
$ celery -A app worker --autoreload --include=$(find . -path .path_to_your_env -prune -o -name "*.py" -type f | awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' | sed 's/.$//')

3
投票

我使用了

watchdog
watchdemo
实用程序,它工作得很好,但由于某种原因,PyCharm 调试器无法调试由
watchdemo
生成的子进程。

因此,如果您的项目有

werkzeug
作为依赖项,您可以使用
werkzeug._reloader.run_with_reloader
函数在代码更改时自动重新加载 celery Worker。另外,它可以与 PyCharm 调试器配合使用。

"""
Filename: celery_dev.py
"""

import sys

from werkzeug._reloader import run_with_reloader

# this is the celery app path in my application, change it according to your project
from web.app import celery_app


def run():
    # create copy of "argv" and remove script name
    argv = sys.argv.copy()
    argv.pop(0)

    # start the celery worker
    celery_app.worker_main(argv)


if __name__ == '__main__':
    run_with_reloader(run)

PyCharm 调试配置示例。

注意:

这是一个私有

werkzeug
API,从
Werkzeug==2.0.3
开始运行。它可能会在未来版本中停止工作。使用需要您自担风险。


3
投票

@AlexTT 答案中有一个问题,我不知道是否应该评论他的答案并将其作为答案。

您可以使用watchmedo

pip install watchdog

通过watchmedo间接启动celeryworker

watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A <app> worker --concurrency=1 --loglevel=INFO

2
投票

OrangeTux 的解决方案对我来说并不奏效,因此我编写了一些 Python 脚本来实现大致相同的效果。它使用 inotify 监视文件更改,并在检测到

IN_MODIFY
IN_ATTRIB
IN_DELETE
时触发 celery 重新启动。

#!/usr/bin/env python
"""Runs a celery worker, and reloads on a file change. Run as ./run_celery [directory]. If
directory is not given, default to cwd."""
import os
import sys
import signal
import time

import multiprocessing
import subprocess
import threading

import inotify.adapters


CELERY_CMD = tuple("celery -A amcat.amcatcelery worker -l info -Q amcat".split())
CHANGE_EVENTS = ("IN_MODIFY", "IN_ATTRIB", "IN_DELETE")
WATCH_EXTENSIONS = (".py",)

def watch_tree(stop, path, event):
    """
    @type stop: multiprocessing.Event
    @type event: multiprocessing.Event
    """
    path = os.path.abspath(path)

    for e in inotify.adapters.InotifyTree(path).event_gen():
        if stop.is_set():
            break

        if e is not None:
            _, attrs, path, filename = e

            if filename is None:
                continue

            if any(filename.endswith(ename) for ename in WATCH_EXTENSIONS):
                continue

            if any(ename in attrs for ename in CHANGE_EVENTS):
                event.set()


class Watcher(threading.Thread):
    def __init__(self, path):
        super(Watcher, self).__init__()
        self.celery = subprocess.Popen(CELERY_CMD)
        self.stop_event_wtree = multiprocessing.Event()
        self.event_triggered_wtree = multiprocessing.Event()
        self.wtree = multiprocessing.Process(target=watch_tree, args=(self.stop_event_wtree, path, self.event_triggered_wtree))
        self.wtree.start()
        self.running = True

    def run(self):
        while self.running:
            if self.event_triggered_wtree.is_set():
                self.event_triggered_wtree.clear()
                self.restart_celery()
            time.sleep(1)

    def join(self, timeout=None):
        self.running = False
        self.stop_event_wtree.set()
        self.celery.terminate()
        self.wtree.join()
        self.celery.wait()
        super(Watcher, self).join(timeout=timeout)

    def restart_celery(self):
        self.celery.terminate()
        self.celery.wait()
        self.celery = subprocess.Popen(CELERY_CMD)


if __name__ == '__main__':
    watcher = Watcher(sys.argv[1] if len(sys.argv) > 1 else ".")
    watcher.start()

    signal.signal(signal.SIGINT, lambda signal, frame: watcher.join())
    signal.pause()

您可能应该更改

CELERY_CMD
或任何其他全局变量。


1
投票

这是我在 Django 中实现它的方式:

# worker_dev.py (put it next to manage.py)
from django.utils import autoreload


def run_celery():
    from projectname import celery_app

    celery_app.worker_main(["-Aprojectname", "-linfo", "-Psolo"])


print("Starting celery worker with autoreload...")
autoreload.run_with_reloader(run_celery)

然后运行

python worker_dev.py
。这具有在 docker 容器内工作的优点。


0
投票

这是 Suor 代码的巨大改编。

我制作了一个自定义 Django 命令,可以这样调用:

python manage.py runcelery

所以,每次代码更改时,celery 的主进程都会被优雅地杀死,然后再次执行。

根据需要更改

CELERY_COMMAND
变量。

# File: runcelery.py
import os
import signal
import subprocess
import time

import psutil
from django.core.management.base import BaseCommand
from django.utils import autoreload


DELAY_UNTIL_START = 5.0
CELERY_COMMAND = 'celery --config my_project.celeryconfig worker --loglevel=INFO'


class Command(BaseCommand):

    help = ''

    def kill_celery(self, parent_pid):
        os.kill(parent_pid, signal.SIGTERM)

    def run_celery(self):
        time.sleep(DELAY_UNTIL_START)
        subprocess.run(CELERY_COMMAND.split(' '))

    def get_main_process(self):
        for process in psutil.process_iter():
            if process.ppid() == 0:  # PID 0 has no parent
                continue

            parent = psutil.Process(process.ppid())

            if process.name() == 'celery' and parent.name() == 'celery':
                return parent

        return

    def reload_celery(self):
        parent = self.get_main_process()

        if parent is not None:
            self.stdout.write('[*] Killing Celery process gracefully..')
            self.kill_celery(parent.pid)

        self.stdout.write('[*] Starting Celery...')
        self.run_celery()

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.reload_celery)

0
投票

另一种解决方案。 我在 Docker 上使用 Django 5.0 + Celery 5.2.7。

我实现了可以在这里找到的内容: https://testdriven.io/courses/django-celery/auto-reload/

celery_worker.py -> 在应用程序的 management/commands/ 文件夹中

import shlex
import subprocess
import sys

from django.core.management.base import BaseCommand
from django.utils import autoreload


def restart_celery():
    celery_worker_cmd = "celery -A <app> worker"
    cmd = f'pkill -f "{celery_worker_cmd}"'
    if sys.platform == "win32":
        cmd = "taskkill /f /t /im celery.exe"
    
    subprocess.call(shlex.split(cmd))
    cmd = f'{celery_worker_cmd} -l info'
    subprocess.call(shlex.split(cmd)) 


class Command(BaseCommand):

    def handle(self, *args, **options):
        print('Starting celery worker with autoreload...')
        autoreload.run_with_reloader(restart_celery)

docker-compose.yml

  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - .:/code

  celery:
    build: .
    command: python manage.py celery_worker
    volumes:
      - ./:/code

非常重要,您的网络应用程序和 celery 的 2 个卷是相同的,否则文件将无法在更改时正确更新。

就我而言,网络应用程序不使用我用于 celery 的一些文件。 这会导致自动重新加载忽略此类文件。这意味着使用上面的代码,如果您更改仅由 celery 使用的脚本,它仍然不会触发自动重新加载。

我关注了

Django 自动重载:添加监视文件 .

这就是我更改应用程序文件的方式。

apps.py

from django.apps import AppConfig from django.utils.autoreload import autoreload_started def celery_watchdog(sender, **kwargs): sender.watch_dir('/code/core/logic', '*.py') class CoreConfig(AppConfig): default_auto_field = 'django.db.models.BigAutoField' name = 'core' def ready(self): autoreload_started.connect(celery_watchdog)
    
© www.soinside.com 2019 - 2024. All rights reserved.