我目前遇到一个特别棘手的问题,我会尽力向您解释。
我有一个Django项目,它的主要目的是从数据库快速执行排队的任务。我使用Celery和Celerybeat通过Django渠道实现此目标,以使用响应实时更新我的模板。
[Celery工人是一个具有大量线程的gevent工人池。
我的任务(简体版):
@shared_task
def exec_task(action_id):
# execute the action
action = Action.objects.get(pk=action_id)
response = post_request(action)
# update action status
if response.status_code == 200:
action.status = 'completed'
else:
action.status = 'failed'
# save the action to the DB
action.save()
channel_layer = get_channel_layer()
status_data = {'id': action.id, 'status': action.status}
status_data = json.dumps(status_data)
try:
async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
except:
event_loop = asyncio.get_running_loop()
future = asyncio.run_coroutine_threadsafe(channel_layer.group_send('channel_group', {'type': 'propergate_status', 'data': status_data}), event_loop)
result = future.result()
我的错误:
[2019-10-03 18:47:59,990:排队的警告/ MainProcess]操作:25
[2019-10-03 18:48:02,206:警告/ MainProcess]c:\ users \ jack \ documents \ github \ mcr-admin \ venv \ lib \ site-packages \ gevent_socket3.py:123:RuntimeWarning:从未等待协程'AsyncToSync.main_wrap'self._read_event = io_class(fileno,1)
RuntimeWarning:启用tracemalloc以获取对象分配回溯
[2019-10-03 18:48:02,212:警告/ MainProcess] c:\ users \ jack \ documents \ github \ mcr-admin \ venv \ lib \ site-packages \ gevent_socket3.py:123:RuntimeWarning:协程'BaseEventLoop.shutdown_asyncgens'从未出现等待的self._read_event = io_class(fileno,1)RuntimeWarning:
最初将操作保存到我刚刚调用的数据库后:
async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
但是我一直遇到运行时错误,因为如果已经有一个asyncio事件循环正在运行,则无法使用async_to_sync,as shown here at line 61。因此,我有多个gevent线程试图将async_to_sync紧密地连接在一起,从而不断在链接中引发错误。
这导致我进入this wonderful answer,并且当前的exec_task版本在向Django Channels组发送消息时具有98%的成功率,但我确实需要100%。
这里的问题是,在我添加的协程有机会完成之前,异步事件循环有时会停止,并且我一直在调整代码,使用asyncio和事件循环api玩,但是我要么破坏代码要么得到结果更糟。我觉得这可能与Asgiref async_to_sync函数尽早关闭循环有关,但它很复杂,而且我只是在几天前才开始使用python async。
欢迎任何反馈,评论,技巧或修复!
干杯。
我目前遇到一个特别棘手的问题,我将尽力解释。我有一个Django项目,它的主要目的是从数据库快速执行排队的任务。我用芹菜和...
您好,我目前遇到的确切问题是,从完成的芹菜任务向客户端发送消息至关重要。
最后,我无法解决问题,而是使用通道AsyncHttpConsumer