我有很多用于 websocket 连接的异步代码。想法很简单:从 websocket 获取数据,计算一些数学并执行大量 io。
data = await websocket.recv()
# Do some math
await obj.save()
await redis.publish(data)
await api_client.send(data)
# And so on
这在独立容器中完美运行。
但是现在我又接到了另一个任务:
使 Celery Worker 每隔 10 分钟“重新检查”一次来自第三方 api 的数据,并执行所有相同的逻辑。所以我必须把所有这些东西放入芹菜任务中。目前我正在做这样的事情:
@app.task
def sync_with_api():
for obj in objects_to_sync:
asgiref.sync.async_to_sync(websocket_logic)(obj)
这是可行的,但我以同步方式完成所有这些,但我们期望同步数千个对象。所以我想知道性能。
我考虑过使用 gevent pool 并这样做了:
@app.task
def sync_objects():
for obj in objects_to_sync:
app.send_task("core.sync_task", args=[obj])
@app.task
def sync_task(obj):
asgiref.sync.async_to_sync(websocket_logic)(obj)
从我的角度来看,这应该效果更好。但我不确定将 gevent 与 asyncio 结合起来是否是个好主意,因为这是两种不同的并发模型。有没有人遇到过这样的问题?
或者我应该重构原始代码并制作两个版本:
我最终得到了解决方案:
async def sync_object(obj):
await api_client.call()
# some another logic
@app.task
def task_to_sync():
objects = Objects.objects.all()
loop = asyncio.get_event_loop()
group = asyncio.gather(*[sync_objects(obj) for obj in objects])
loop.run_until_complete(group)
loop.close()
我有两个芹菜容器。一种用于 i/o 后台任务,gevent 作为并发模型。其次是使用 prefork 来处理常见任务和 cpu 限制。该任务应该每 10 分钟调用一次,因此这个解决方案足以处理它。所以我从 prefork celery 调用,它工作得足够好。