我有一个烧瓶socketio装饰的功能。在使用芹菜我的应用程序和即时消息在这个过程中的一部分,我想打电话给装饰下面的功能
@socketio.on('Example')
@authenticated_only
def example(data):
...
问题是,该事件使用CURRENT_USER里面,我怎么能模拟正常通话?只是代码eficiency所以我不必重新芹菜任务中完全一样的功能。
我想是这样的:
@celery.task()
def celery_task:
current_user=User.query.filter...
example(data)
但是我不能确定的是解决方案,我不甚至尝试
你可以这样做:
def basic_example(data, user):
# do everything here
@socketio.on('Example')
@authenticated_only
def example(data):
basic_example(data, current_user)
@celery.task()
def celery_task:
user = User.query.filter...
basic_example(data, user)