Celery docs 解释如何设置事件的自定义处理。然而,这种方法似乎不太适合我的 Django 项目和 celery。
我的
celery.py
文件看起来像
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')
# The custom monitor copied from celery docs
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('MY MON TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(), ))
with app.connection() as connection:
recv = app.events.Receiver(connection,
handlers={'task-failed': announce_failed_tasks, }
)
recv.capture(limit=None, timeout=None, wakeup=True)
app = Celery('myproj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# Monitoring
my_monitor(app)
我刚刚从文档中复制了示例监视器。
我没有收到错误,但启动 celery worker/beat 或 django 的 runserver 似乎永远挂起,直到我注释掉
my_monitor(app)
行。
在 Django 项目中执行此操作的正确方法是什么?
recv.capture 是一个阻塞调用。您从事件处理程序中得到任何输出吗?
如果您尝试将 django 服务器用作监视器和任务生产者/消费者,这是行不通的。
您的显示器应该是一个单独的独立应用程序。您还应该考虑消除 django 依赖,因为它是不必要的。
使用 Flower 监控和管理 Celery 任务
对于实时监控和管理 Celery 任务,Flower 是一个出色的工具。它是一个基于 Tornado 构建的基于 Web 的应用程序,提供用于流式传输任务事件和状态的 WebSocket 服务器。
安装与配置:
安装Flower:使用
pipenv
进行依赖管理,结合pip
和virtualenv
。安装花:
pipenv install flower
启动 Flower:要监控您的 Celery 应用程序,请使用以下命令启动 Flower:
flower -A $projectname -l info
将
$projectname
替换为您的项目名称。
通过事件报告启动 Celery Worker:通过使用以下命令启动 Celery Worker 在 Flower 中启用任务监控:
celery -A $projectname worker -E -B -l info
使用的标志:
-E
:启用任务监控事件。-B
:启动 Celery Beat 执行计划任务。与 Django 集成:
要在 Django 应用程序中显示任务事件,您可以创建一个监听 Flower 的 WebSocket 并实时更新的模板。
模板放置:
myapp
,则路径将为 myapp/templates/myapp/task_monitor.html
。Django 模板示例:
{% extends "admin/base_site.html" %} <!-- Note: base.html is updated to base_site.html for newer Django versions -->
{% load static %}
{% block extrastyle %}
<style>
table, th, td {
border: 1px solid black;
border-collapse: collapse;
}
th, td {
padding: 5px;
}
</style>
{% endblock %}
{% block extrahead %}
<title>Monitor Celery Task Events</title>
<script language="javascript" type="text/javascript">
var ws_success = new WebSocket('ws://localhost:5555/api/task/events/task-succeeded/');
ws_success.onmessage = function (event) {
var outputDiv = document.getElementById("output");
var listItem = document.createElement('li');
listItem.innerText = event.data;
outputDiv.appendChild(listItem);
}
</script>
{% endblock %}
{% block content %}
<h2>Events:</h2>
<ul><div id="output"></div></ul>
{% endblock %}
此模板订阅 WebSocket 端点
ws://localhost:5555/api/task/events/task-succeeded/
,显示每个任务的成功事件。
WebSocket 端点:
ws://localhost:5555/api/task/events/task-succeeded/
ws://localhost:5555/dashboard
进一步步骤:
确保 Celery 在您的 Django 项目中配置正确。这涉及按照 Celery 的文档调整
$projectname/celery.py
、$projectname/__init__.py
和 celeryconfig.py
等文件。
其他资源:
通过此设置,您的 Django 应用程序将拥有一个用于监控 Celery 任务的界面,利用 Flower 的全面事件跟踪功能。