我正在使用AsyncWebSocketConsumer
,我想让消费者在处理一个特定事件时睡眠10秒钟。因此,收到该事件后,我将呼叫await asyncio.sleep(10)
。但这是一个问题,它在接下来的10秒内将事件阻止到此使用者实例。
我的问题是,如果我使用的是AsyncConsumer
,即使我处于异步睡眠状态,它也不应该继续发送将来的事件吗?
我一直在浏览daphne服务器的代码,发现以下内容:
def onMessage(self, payload, isBinary):
# If we're muted, do nothing.
if self.muted:
logger.debug("Muting incoming frame on %s", self.client_addr)
return
logger.debug("WebSocket incoming frame on %s", self.client_addr)
self.last_ping = time.time()
if isBinary:
self.application_queue.put_nowait(
{"type": "websocket.receive", "bytes": payload}
)
else:
self.application_queue.put_nowait(
{"type": "websocket.receive", "text": payload.decode("utf8")}
)
这意味着在从前端接收到新事件时,它将创建一个新任务并将其添加到异步队列中以供使用者使用。那么,为什么要睡在一个异步任务上阻止其他任务呢?我在这里想念什么吗?
通过生产者-消费者模式,每个消费者都按顺序消耗队列:从队列获取>进程>从队列获取...在给定项目(或“事件”)的处理阶段,您要使消费者进入睡眠状态,因此该过程直到睡眠完成才结束。您不能消费下一项,这就是您的消费者被封锁的原因。您可以为该特定“事件”创建一个新的和[[dedicated使用者,这样睡眠就不会阻止其他事件的处理。参见asyncio.Queue,让您的AsyncWebSocketConsumer
拥有一个异步队列,并用.put_nowait()填充它,并让守护程序协程(新使用者)使用.get()来消耗它,新使用者可以在获得下一个项目之前先使用await asyncio.sleep(10)
。 。