手动调用烧瓶socketio事件处理程序

问题描述 投票:0回答:1

我有一个烧瓶socketio装饰的功能。在使用芹菜我的应用程序和即时消息在这个过程中的一部分,我想打电话给装饰下面的功能

@socketio.on('Example')
@authenticated_only                            
def example(data):
    ...

问题是,该事件使用CURRENT_USER里面,我怎么能模拟正常通话?只是代码eficiency所以我不必重新芹菜任务中完全一样的功能。

我想是这样的:

@celery.task()
def celery_task:
    current_user=User.query.filter...
    example(data)

但是我不能确定的是解决方案,我不甚至尝试

flask flask-socketio
1个回答
0
投票

你可以这样做:

def basic_example(data, user):
    # do everything here

@socketio.on('Example')
@authenticated_only                            
def example(data):
    basic_example(data, current_user)

@celery.task()
def celery_task:
    user = User.query.filter...
    basic_example(data, user)
© www.soinside.com 2019 - 2024. All rights reserved.