Django频道从Celery任务发送组消息。 Asyncio事件循环在所有异步任务完成之前停止

问题描述 投票:2回答:2

我目前遇到一个特别棘手的问题,我会尽力向您解释。

我有一个Django项目,它的主要目的是从数据库快速执行排队的任务。我使用Celery和Celerybeat通过Django渠道实现此目标,以使用响应实时更新我的​​模板。

[Celery工人是一个具有大量线程的gevent工人池。

我的任务(简体版):

@shared_task
def exec_task(action_id):
  # execute the action
  action = Action.objects.get(pk=action_id)
  response = post_request(action)

  # update action status
  if response.status_code == 200:
    action.status = 'completed'

  else:
    action.status = 'failed'

  # save the action to the DB
  action.save()

  channel_layer = get_channel_layer()
  status_data = {'id': action.id, 'status': action.status}
  status_data = json.dumps(status_data)
  try:
    async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
  except:
    event_loop = asyncio.get_running_loop()
    future = asyncio.run_coroutine_threadsafe(channel_layer.group_send('channel_group', {'type': 'propergate_status', 'data': status_data}), event_loop)
    result = future.result()

我的错误:

[2019-10-03 18:47:59,990:排队的警告/ MainProcess]操作:25

[2019-10-03 18:48:02,206:警告/ MainProcess]c:\ users \ jack \ documents \ github \ mcr-admin \ venv \ lib \ site-packages \ gevent_socket3.py:123:RuntimeWarning:从未等待协程'AsyncToSync.main_wrap'self._read_event = io_class(fileno,1)

RuntimeWarning:启用tracemalloc以获取对象分配回溯

[2019-10-03 18:48:02,212:警告/ MainProcess] c:\ users \ jack \ documents \ github \ mcr-admin \ venv \ lib \ site-packages \ gevent_socket3.py:123:RuntimeWarning:协程'BaseEventLoop.shutdown_asyncgens'从未出现等待的self._read_event = io_class(fileno,1)RuntimeWarning:

最初将操作保存到我刚刚调用的数据库后:

async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})

但是我一直遇到运行时错误,因为如果已经有一个asyncio事件循环正在运行,则无法使用async_to_syncas shown here at line 61。因此,我有多个gevent线程试图将async_to_sync紧密地连接在一起,从而不断在链接中引发错误。

这导致我进入this wonderful answer,并且当前的exec_task版本在向Django Channels组发送消息时具有98%的成功率,但我确实需要100%。

这里的问题是,在我添加的协程有机会完成之前,异步事件循环有时会停止,并且我一直在调整代码,使用asyncio和事件循环api玩,但是我要么破坏代码要么得到结果更糟。我觉得这可能与Asgiref async_to_sync函数尽早关闭循环有关,但它很复杂,而且我只是在几天前才开始使用python async。

欢迎任何反馈,评论,技巧或修复!

干杯。

我目前遇到一个特别棘手的问题,我将尽力解释。我有一个Django项目,它的主要目的是从数据库快速执行排队的任务。我用芹菜和...

django celery gevent channels python-3.7.4
2个回答
1
投票

您好,我目前遇到的确切问题是,从完成的芹菜任务向客户端发送消息至关重要。


0
投票

最后,我无法解决问题,而是使用通道AsyncHttpConsumer

© www.soinside.com 2019 - 2024. All rights reserved.