Flask-SocketIO和Celery任务,socketio.emit()在任务完成后不向客户端发出

问题描述 投票:0回答:1

在成功执行芹菜任务后尝试使用flask-socketio对所有连接的客户端进行emit()时遇到麻烦。

这个想法如下:

调用flask端点 - >触发工人的芹菜任务

# api.py
@app.route('/api/task', method=['GET'])
def trigger_celery_task():
     celery_task.delay()

在完成任务之前,芹菜工作者调用另一个烧瓶api端点,该端点触发套接字服务器发出事件

# tasks.py
from server.app import create_celery_app # see below
celery = create_celery_app()

@celery.task
def celery_task():
     # exec some long task
     import requests
     url='http://backend:5000/api/update_data'
     requests.get(url)
     return None

# api.py
@app.route('/api/update_data', method=['GET'])
def trigger_update():
     from server.app import socketio
     socketio.emit('new_data_available', {'message': 'update'})

我的烧瓶应用程序给我以下错误消息(当连接2个客户端时):

"GET /api/update_data HTTP/1.1" 200
Cannot send to sid 85d...
Cannot send to sid 52c...

但是,如果我手动调用端点/api/update_data,套接字服务器会正​​确发出事件并且所有客户端都会收到它:

85d...: Sending packet MESSAGE data 2["new_data_available",{"message":"update"}]
52c...: Sending packet MESSAGE data 2["new_data_available",{"message":"update"}]
"GET /api/update_data HTTP/1.1" 200

我的配置如下:

# app.py

# initialize sql-alchemy
db = SQLAlchemy()

# initialize socketIO server
socketio = SocketIO(path='/api/socket-io', engineio_logger=True)

def create_app(config_name):
    # initialize flask app
    app = FlaskAPI(__name__, instance_relative_config=True)

    # configuration
    app.config.from_object(app_config[config_name])
    CORS(app)
    db.init_app(app)
    socketio.init_app(app)

    # add blueprints/routes
    # ...

    return app

def create_celery_app(app=None):
    config_name = os.getenv('APP_SETTINGS')
    app = app or create_app(config_name)

    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

最后运行配置:

# run.py
import os
from server.app import create_app

config_name = os.getenv('APP_SETTINGS')  # config_name = "development"
app = create_app(config_name)

if __name__ == '__main__':
    # app.run(host='0.0.0.0', port=5000, threaded=True)
    from server.app import socketio
    socketio.run(app, host='0.0.0.0', port=5000)

请注意,我正在Docker容器中运行完整的应用程序,这些容器由docker-compose编排,以防万一。

非常感谢,我感谢任何帮助。

python flask socket.io celery flask-socketio
1个回答
0
投票

我可以轻松地使用Redis,但不能将RabbitMQ作为代理。只需在芹菜装饰任务中创建一个新的SocketIO(与包装烧瓶应用程序的socketio实例分开)实例,然后你就可以在那里发射它并以某种方式获取应用程序进程......

© www.soinside.com 2019 - 2024. All rights reserved.