当
CELERY_IMPORTS
中的 settings.py
中的模块发生更改时,我可以使 celery 自动重新加载。
我尝试让母模块检测子模块的更改,但它没有检测到子模块的更改。这让我明白检测不是由 celery 递归完成的。我在文档中搜索了它,但没有找到针对我的问题的任何回复。
将项目中所有与 celery 相关的部分添加到
CELERY_IMPORTS
来检测变化真的很困扰我。
有没有办法告诉celery“当项目的任何地方有任何变化时自动重新加载”。
谢谢您!
Celery
--autoreload
不起作用,它已被 弃用。
既然你使用的是django,你可以为此编写一个管理命令。 Django 有 autoreload 实用程序,runserver 使用它在代码更改时重新启动 WSGI 服务器。
相同的功能可用于重新加载 celery 工作线程。创建一个名为 celery 的单独管理命令。编写一个函数来杀死现有的工作人员并启动一个新的工作人员。现在将此函数挂接到自动重新加载,如下所示。
import shlex
import subprocess
from django.core.management.base import BaseCommand
from django.utils import autoreload
def restart_celery():
cmd = 'pkill celery'
subprocess.call(shlex.split(cmd))
cmd = 'celery worker -l info -A foo'
subprocess.call(shlex.split(cmd))
class Command(BaseCommand):
def handle(self, *args, **options):
print('Starting celery worker with autoreload...')
# For Django>=2.2
autoreload.run_with_reloader(restart_celery)
# For django<2.1
# autoreload.main(restart_celery)
现在您可以使用
python manage.py celery
运行 celery Worker,它将在代码库更改时自动重新加载。
这仅用于开发目的,请勿在生产中使用。代码取自我的其他答案。
您可以使用watchmedo
pip install watchdog
通过watchmedo间接启动celeryworker
watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery worker --app=worker.app --concurrency=1 --loglevel=INFO
您可以使用
-I|--include
手动包含其他模块。将其与 find
和 awk
等 GNU 工具结合使用,您将能够找到所有 .py
文件并包含它们。
$ celery -A app worker --autoreload --include=$(find . -name "*.py" -type f | awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' | sed 's/.$//')
让我们解释一下:
find . -name "*.py" -type f
find
递归搜索包含 .py
的所有文件。输出看起来像这样:
./app.py
./some_package/foopy
./some_package/bar.py
然后:
awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=','
此行将
find
的输出作为输入,并删除所有出现的 ./
。然后它将所有 /
替换为 .
。最后一个 sub()
删除并用空字符串替换 .py
。 ORS
将所有换行符替换为 ,
。输出:
app,some_package.foo,some_package.bar,
最后一个命令
sed
删除最后一个 ,
。
因此正在执行的命令如下所示:
$ celery -A app worker --autoreload --include=app,some_package.foo,some_package.bar
如果源代码中有
virtualenv
,您可以通过添加 -path .path_to_your_env -prune -o
: 来排除它
$ celery -A app worker --autoreload --include=$(find . -path .path_to_your_env -prune -o -name "*.py" -type f | awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' | sed 's/.$//')
我使用了
watchdog
watchdemo
实用程序,它工作得很好,但由于某种原因,PyCharm 调试器无法调试由 watchdemo
生成的子进程。
因此,如果您的项目有
werkzeug
作为依赖项,您可以使用 werkzeug._reloader.run_with_reloader
函数在代码更改时自动重新加载 celery Worker。另外,它可以与 PyCharm 调试器配合使用。
"""
Filename: celery_dev.py
"""
import sys
from werkzeug._reloader import run_with_reloader
# this is the celery app path in my application, change it according to your project
from web.app import celery_app
def run():
# create copy of "argv" and remove script name
argv = sys.argv.copy()
argv.pop(0)
# start the celery worker
celery_app.worker_main(argv)
if __name__ == '__main__':
run_with_reloader(run)
PyCharm 调试配置示例。
注意:
这是一个私有
werkzeug
API,从 Werkzeug==2.0.3
开始运行。它可能会在未来版本中停止工作。使用需要您自担风险。
@AlexTT 答案中有一个问题,我不知道是否应该评论他的答案并将其作为答案。
您可以使用watchmedo
pip install watchdog
通过watchmedo间接启动celeryworker
watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A <app> worker --concurrency=1 --loglevel=INFO
OrangeTux 的解决方案对我来说并不奏效,因此我编写了一些 Python 脚本来实现大致相同的效果。它使用 inotify 监视文件更改,并在检测到
IN_MODIFY
、IN_ATTRIB
或 IN_DELETE
时触发 celery 重新启动。
#!/usr/bin/env python
"""Runs a celery worker, and reloads on a file change. Run as ./run_celery [directory]. If
directory is not given, default to cwd."""
import os
import sys
import signal
import time
import multiprocessing
import subprocess
import threading
import inotify.adapters
CELERY_CMD = tuple("celery -A amcat.amcatcelery worker -l info -Q amcat".split())
CHANGE_EVENTS = ("IN_MODIFY", "IN_ATTRIB", "IN_DELETE")
WATCH_EXTENSIONS = (".py",)
def watch_tree(stop, path, event):
"""
@type stop: multiprocessing.Event
@type event: multiprocessing.Event
"""
path = os.path.abspath(path)
for e in inotify.adapters.InotifyTree(path).event_gen():
if stop.is_set():
break
if e is not None:
_, attrs, path, filename = e
if filename is None:
continue
if any(filename.endswith(ename) for ename in WATCH_EXTENSIONS):
continue
if any(ename in attrs for ename in CHANGE_EVENTS):
event.set()
class Watcher(threading.Thread):
def __init__(self, path):
super(Watcher, self).__init__()
self.celery = subprocess.Popen(CELERY_CMD)
self.stop_event_wtree = multiprocessing.Event()
self.event_triggered_wtree = multiprocessing.Event()
self.wtree = multiprocessing.Process(target=watch_tree, args=(self.stop_event_wtree, path, self.event_triggered_wtree))
self.wtree.start()
self.running = True
def run(self):
while self.running:
if self.event_triggered_wtree.is_set():
self.event_triggered_wtree.clear()
self.restart_celery()
time.sleep(1)
def join(self, timeout=None):
self.running = False
self.stop_event_wtree.set()
self.celery.terminate()
self.wtree.join()
self.celery.wait()
super(Watcher, self).join(timeout=timeout)
def restart_celery(self):
self.celery.terminate()
self.celery.wait()
self.celery = subprocess.Popen(CELERY_CMD)
if __name__ == '__main__':
watcher = Watcher(sys.argv[1] if len(sys.argv) > 1 else ".")
watcher.start()
signal.signal(signal.SIGINT, lambda signal, frame: watcher.join())
signal.pause()
您可能应该更改
CELERY_CMD
或任何其他全局变量。
这是我在 Django 中实现它的方式:
# worker_dev.py (put it next to manage.py)
from django.utils import autoreload
def run_celery():
from projectname import celery_app
celery_app.worker_main(["-Aprojectname", "-linfo", "-Psolo"])
print("Starting celery worker with autoreload...")
autoreload.run_with_reloader(run_celery)
然后运行
python worker_dev.py
。这具有在 docker 容器内工作的优点。
这是 Suor 代码的巨大改编。
我制作了一个自定义 Django 命令,可以这样调用:
python manage.py runcelery
所以,每次代码更改时,celery 的主进程都会被优雅地杀死,然后再次执行。
根据需要更改
CELERY_COMMAND
变量。
# File: runcelery.py
import os
import signal
import subprocess
import time
import psutil
from django.core.management.base import BaseCommand
from django.utils import autoreload
DELAY_UNTIL_START = 5.0
CELERY_COMMAND = 'celery --config my_project.celeryconfig worker --loglevel=INFO'
class Command(BaseCommand):
help = ''
def kill_celery(self, parent_pid):
os.kill(parent_pid, signal.SIGTERM)
def run_celery(self):
time.sleep(DELAY_UNTIL_START)
subprocess.run(CELERY_COMMAND.split(' '))
def get_main_process(self):
for process in psutil.process_iter():
if process.ppid() == 0: # PID 0 has no parent
continue
parent = psutil.Process(process.ppid())
if process.name() == 'celery' and parent.name() == 'celery':
return parent
return
def reload_celery(self):
parent = self.get_main_process()
if parent is not None:
self.stdout.write('[*] Killing Celery process gracefully..')
self.kill_celery(parent.pid)
self.stdout.write('[*] Starting Celery...')
self.run_celery()
def handle(self, *args, **options):
autoreload.run_with_reloader(self.reload_celery)
另一种解决方案。 我在 Docker 上使用 Django 5.0 + Celery 5.2.7。
我实现了可以在这里找到的内容: https://testdriven.io/courses/django-celery/auto-reload/
celery_worker.py -> 在应用程序的 management/commands/ 文件夹中
import shlex
import subprocess
import sys
from django.core.management.base import BaseCommand
from django.utils import autoreload
def restart_celery():
celery_worker_cmd = "celery -A <app> worker"
cmd = f'pkill -f "{celery_worker_cmd}"'
if sys.platform == "win32":
cmd = "taskkill /f /t /im celery.exe"
subprocess.call(shlex.split(cmd))
cmd = f'{celery_worker_cmd} -l info'
subprocess.call(shlex.split(cmd))
class Command(BaseCommand):
def handle(self, *args, **options):
print('Starting celery worker with autoreload...')
autoreload.run_with_reloader(restart_celery)
docker-compose.yml
web:
build: .
command: python manage.py runserver 0.0.0.0:8000
volumes:
- .:/code
celery:
build: .
command: python manage.py celery_worker
volumes:
- ./:/code
非常重要,您的网络应用程序和 celery 的 2 个卷是相同的,否则文件将无法在更改时正确更新。
就我而言,网络应用程序不使用我用于 celery 的一些文件。 这会导致自动重新加载忽略此类文件。这意味着使用上面的代码,如果您更改仅由 celery 使用的脚本,它仍然不会触发自动重新加载。我关注了
Django 自动重载:添加监视文件 .
这就是我更改应用程序文件的方式。apps.py
from django.apps import AppConfig
from django.utils.autoreload import autoreload_started
def celery_watchdog(sender, **kwargs):
sender.watch_dir('/code/core/logic', '*.py')
class CoreConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'core'
def ready(self):
autoreload_started.connect(celery_watchdog)