如何从芹菜任务中发出事件

问题描述 投票:0回答:2

基本上,我试图在一个单独的线程中在服务器端生成事件。我有一个celery.task应该发出事件,但它的代码永远不会被执行。

import json
import time

from celery import Celery

from flask import Flask
from flask import jsonify
from flask import render_template
from flask import request

from flask_socketio import SocketIO

broker_url = "redis://localhost:6379/1"

celery = Celery(broker=broker_url)
app = Flask(__name__)
socketio = SocketIO(app, message_queue=broker_url)


@celery.task
def countdown(n):
    print("countdown", n)
    for i in range(n+1):
        time.sleep(1)
        socketio.emit(
            "countdown",
            {"remaining": n - i},
            namespace="/test/"
        )


@app.route("/")
def index():
    return render_template("index.html")


@app.route("/start_countdown/", methods=["POST"])
def start_countdown():
    data = json.loads(request.data.decode())
    countdown.delay([int(data["time"])])
    return jsonify(time_to_wait=data["time"])


if __name__ == '__main__':
    socketio.run(debug=True)

视图反应良好,但任务是沉默的,我无法理解,为什么?

UPD

我重新安排了我的代码,如here。文件夹结构完全相同,文件也一样。在app/main文件夹中,我有额外的tasks.py文件。

import time

from celery import Celery
from flask_socketio import emit

from app import socketio
from config import broker_url


celery = Celery(broker=broker_url)

@celery.task
def countdown(n):
    print(n)

    for i in range(n+1):
        time.sleep(1)
        print("Socket", socketio)
        print("Server", socketio.server)
        socketio.emit(
            "countdown",
            {"remaining": n - i},
            namespace="/test/"
        )

我用celery -A app.main.tasks worker命令开始芹菜工人。当执行countdown任务的代码时,它会因以下异常而失败:

[2017-10-12 19:04:07,797: WARNING/ForkPoolWorker-1] 13
[2017-10-12 19:04:08,799: WARNING/ForkPoolWorker-1] <flask_socketio.SocketIO object at 0x7f07d2a0fc50>
[2017-10-12 19:04:08,803: ERROR/ForkPoolWorker-1] Task app.main.tasks.countdown[68ae2e43-6ab7-4d52-8b3a-a9aaff46c489] raised unexpected: AttributeError("'NoneType' object has no attribute 'emit'",)
Traceback (most recent call last):
  File "/path/to/venv/lib/python3.4/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/path/to/venv/lib/python3.4/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/path/to/tasks.py", line 24, in countdown
    namespace="/test/"
  File "/path/to/venv/lib/python3.4/site-packages/flask_socketio/__init__.py", line 357, in emit
    self.server.emit(event, *args, namespace=namespace, room=room,
AttributeError: 'NoneType' object has no attribute 'emit'

我的任务中的socketio.server由于某种原因是None,而在app/main/events.py文件中它是适当的对象。看起来在我的任务中socketio对象未完全初始化,可能,因为在芹菜流程执行流程不同,但我不知道如何解决它。

python flask celery flask-socketio
2个回答
0
投票

你的celery任务是否在一个单独的tasks.py模块中?例如:

from celery import Celery

celery = Celery('tasks', broker=broker_url)
socketio = SocketIO(app, message_queue=broker_url)

@celery.task
def countdown(n):
    print("countdown", n)
    for i in range(n+1):
        time.sleep(1)
        socketio.emit(
            "countdown",
            {"remaining": n - i},
            namespace="/test/"
        )

然后你可以在提示符下启动这个worker:

 celery -A tasks worker --loglevel=info

然后你应该运行你的主要代码并添加:

 from tasks import countdown

并调用您的倒计时功能,该功能将在一个单独的过程中运行


0
投票

我只能将它与Redis作为经纪人合作,浪费时间尝试使用RabbitMQ作为代理,并且无法从芹菜任务中发出消息。确保使用Flask SocketIO构造函数创建一个新实例,除了主烧瓶应用程序的socketio实例。

© www.soinside.com 2019 - 2024. All rights reserved.