将 Celery Gevent Pool 与 asyncio (asgiref) 结合

问题描述 投票:0回答:1

我有很多用于 websocket 连接的异步代码。想法很简单:从 websocket 获取数据,计算一些数学并执行大量 io。

 data = await websocket.recv()
 # Do some math
 await obj.save()
 await redis.publish(data)
 await api_client.send(data)
 # And so on

这在独立容器中完美运行。

但是现在我又接到了另一个任务:

使 Celery Worker 每隔 10 分钟“重新检查”一次来自第三方 api 的数据,并执行所有相同的逻辑。所以我必须把所有这些东西放入芹菜任务中。目前我正在做这样的事情:

@app.task
def sync_with_api():
     for obj in objects_to_sync:
          asgiref.sync.async_to_sync(websocket_logic)(obj)

这是可行的,但我以同步方式完成所有这些,但我们期望同步数千个对象。所以我想知道性能。

我考虑过使用 gevent pool 并这样做了:

@app.task
 def sync_objects():
     for obj in objects_to_sync:
          app.send_task("core.sync_task", args=[obj])

@app.task
def sync_task(obj):
     asgiref.sync.async_to_sync(websocket_logic)(obj)

从我的角度来看,这应该效果更好。但我不确定将 gevent 与 asyncio 结合起来是否是个好主意,因为这是两种不同的并发模型。有没有人遇到过这样的问题?

或者我应该重构原始代码并制作两个版本:

  1. 异步使用 websockets
  2. 与 gevent 同步工作
celery python-asyncio django-celery gevent
1个回答
0
投票

我最终得到了解决方案:

async def sync_object(obj):
    await api_client.call()
    # some another logic

@app.task
def task_to_sync():
     objects = Objects.objects.all()
     loop = asyncio.get_event_loop()
     group = asyncio.gather(*[sync_objects(obj) for obj in objects])
     loop.run_until_complete(group)
     loop.close()

我有两个芹菜容器。一种用于 i/o 后台任务,gevent 作为并发模型。其次是使用 prefork 来处理常见任务和 cpu 限制。该任务应该每 10 分钟调用一次,因此这个解决方案足以处理它。所以我从 prefork celery 调用,它工作得足够好。

© www.soinside.com 2019 - 2024. All rights reserved.