我正在尝试使用
AsyncJsonWebsocketConsumer
类创建一个 websocket 通道。我想每 5 秒循环发送一次消息,我的 websocket 通道(ReactJS 应用程序客户端)的消费者也应该每 5 秒接收一次消息。虽然我的消费者应用程序在发送最终消息后或 receive_json
功能完成后立即收到所有消息。我似乎无法解决这个问题。
test.py
from channels.generic.websocket import AsyncJsonWebsocketConsumer
class TestController(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
async def receive_json(self, content: dict):
for i in range(0, 5):
await self.send_json({
"text": f"Loop - {i + 1}"
})
sleep(5)
await self.send_json({
"text": {
"type": "finished"
}
})
async def disconnect(self, close_code):
print()
print("DISCONNECTED")
print(close_code)
print()
routing.py
from django.urls import path
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from app.test import TestController
channel_routing = ProtocolTypeRouter({
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter([
path("websocket/test", TestController.as_asgi())
])
)
)
})
这段代码非常混乱。你想做什么?
你的
async def receive_json(self, content: dict):
应该在每次收到消息时被调用。
所以在撰写本文时,如果您通过将
sleep(5)
替换为 await asyncio.sleep(5)
来修复@Igonato 提到的错误,您的代码将在大约 25 秒内发送 6 条 ws 消息以响应每个传入的 ws 消息。
我喜欢将在我们的服务器上运行的 websocket 代码(在 consumers.py 中运行的代码)视为反应式代码。换句话说,代码对通过相同或另一个 websocket 连接发送给它的命令做出反应。这也有助于节省服务器资源;我们让用户计算时间并将我们的答案发回给他们,而不是为多个用户运行多个循环或计算时间。
考虑一个“TestController”或守护进程,它通过向用户显示所需的消息来响应发送给它的命令。客户端正在运行一个 javascript 或 reactJS 循环,每 5 秒发送一条消息。我在这里看到您在 for 循环范围内使用计数器,您可以在 javascript 中做同样的事情,在循环中跟踪计数,然后将其作为字符串中的变量发送到守护进程。我们将使用字符串的第一个字符来表示命令,我们将使用字符串的第二个字符来表示该命令函数的变量。
考虑以下代码,了解如何设置接收函数以对不同的命令做出反应。当我们将数据从前端客户端发送到我们的 websocket 时,将您的
content
设置为包含您需要的每个字符串或字符的字典。
var pingNum = 1;
var pingID = 1;
function newFunction() {
socket.send(JSON.stringify({
"command": "ping",
"pingID" : pingID,
"pingNum": pingNum
}));
pingNum += 1;
}
setInterval(newFunction, 5000);
接下来,让我们在
receive_json
函数中接收该 json,并查找 JSON 中的每个键,看看它说了什么来指导我们的程序前进并响应客户端。
class TestController(AsyncJsonWebsocketConsumer):
##### WebSocket event handlers
async def connect(self):
"""
Called when the websocket is handshaking as part of initial connection.
"""
# Accept the connection
await self.accept()
async def receive_json(self, content):
"""
Called when we get a text frame. Channels will JSON-decode the payload
for us and pass it as the first argument.
"""
# Messages will have a "command" key we can switch on, otherwise its a ready check
command = content.get("command", None)
pingNum = content.get("pingNum", None)
if command == "ping":
if pingNum <= 5:
await self.send_json({
"text": "Loop - " + str(pingNum)
})
else:
await self.send_json({
"text": {
"type": "finished"
}
})
请使用 await asyncio.sleep(5)
import asyncio
from channels.generic.websocket import AsyncJsonWebsocketConsumer
class TestController(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
async def receive_json(self, content: dict):
for i in range(0, 5):
await self.send_json({
"text": f"Loop - {i + 1}"
})
await asyncio.sleep(5)
await self.send_json({
"text": {
"type": "finished"
}
})
async def disconnect(self, close_code):
print()
print("DISCONNECTED")
print(close_code)
print()
是因为你使用了阻塞睡眠功能。相反,您应该使用异步 asyncio.sleep 函数,它不会阻塞事件循环。
例如
import asyncio
from channels.generic.websocket import AsyncJsonWebsocketConsumer
class TestController(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
async def receive_json(self, content: dict):
for i in range(5):
await self.send_json({"text": f"Loop - {i + 1}"})
await asyncio.sleep(5)
await self.send_json({"text": {"type": "finished"}})
async def disconnect(self, close_code):
print("DISCONNECTED")
print(close_code)
那应该很完美。