django websocket 在循环中异步发送消息

问题描述 投票:0回答:4

我正在尝试使用

AsyncJsonWebsocketConsumer
类创建一个 websocket 通道。我想每 5 秒循环发送一次消息,我的 websocket 通道(ReactJS 应用程序客户端)的消费者也应该每 5 秒接收一次消息。虽然我的消费者应用程序在发送最终消息后或
receive_json
功能完成后立即收到所有消息。我似乎无法解决这个问题。

test.py

from channels.generic.websocket import AsyncJsonWebsocketConsumer



class TestController(AsyncJsonWebsocketConsumer):

    async def connect(self):
        await self.accept()


    async def receive_json(self, content: dict):

        for i in range(0, 5):

            await self.send_json({
                "text": f"Loop - {i + 1}"
            })

            sleep(5)

        await self.send_json({
            "text": {
                "type": "finished"
            }
        })


    async def disconnect(self, close_code):
        print()
        print("DISCONNECTED")
        print(close_code)
        print()

routing.py

from django.urls import path

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator

from app.test import TestController



channel_routing = ProtocolTypeRouter({
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter([
                path("websocket/test", TestController.as_asgi())
            ])
        )
    )
})
python django django-channels
4个回答
0
投票

这段代码非常混乱。你想做什么?

你的

async def receive_json(self, content: dict):
应该在每次收到消息时被调用。

所以在撰写本文时,如果您通过将

sleep(5)
替换为
await asyncio.sleep(5)
来修复@Igonato 提到的错误,您的代码将在大约 25 秒内发送 6 条 ws 消息以响应每个传入的 ws 消息。


0
投票

我喜欢将在我们的服务器上运行的 websocket 代码(在 consumers.py 中运行的代码)视为反应式代码。换句话说,代码对通过相同或另一个 websocket 连接发送给它的命令做出反应。这也有助于节省服务器资源;我们让用户计算时间并将我们的答案发回给他们,而不是为多个用户运行多个循环或计算时间。

考虑一个“TestController”或守护进程,它通过向用户显示所需的消息来响应发送给它的命令。客户端正在运行一个 javascript 或 reactJS 循环,每 5 秒发送一条消息。我在这里看到您在 for 循环范围内使用计数器,您可以在 javascript 中做同样的事情,在循环中跟踪计数,然后将其作为字符串中的变量发送到守护进程。我们将使用字符串的第一个字符来表示命令,我们将使用字符串的第二个字符来表示该命令函数的变量。

考虑以下代码,了解如何设置接收函数以对不同的命令做出反应。当我们将数据从前端客户端发送到我们的 websocket 时,将您的

content
设置为包含您需要的每个字符串或字符的字典。

 var pingNum = 1;
 var pingID = 1;
 function newFunction() {
            socket.send(JSON.stringify({
                "command": "ping",
                "pingID" : pingID,
                "pingNum": pingNum
            }));
            pingNum += 1;
 }
 setInterval(newFunction, 5000);

接下来,让我们在

receive_json
函数中接收该 json,并查找 JSON 中的每个键,看看它说了什么来指导我们的程序前进并响应客户端。

class TestController(AsyncJsonWebsocketConsumer):

##### WebSocket event handlers

async def connect(self):
    """
    Called when the websocket is handshaking as part of initial connection.
    """
    # Accept the connection
    await self.accept()

async def receive_json(self, content):
    """
    Called when we get a text frame. Channels will JSON-decode the payload
    for us and pass it as the first argument.
    """
    # Messages will have a "command" key we can switch on, otherwise its a ready check
    command = content.get("command", None)
    pingNum = content.get("pingNum", None)

    if command == "ping":
        if pingNum <= 5:
            await self.send_json({
                "text": "Loop - " + str(pingNum)
            })
        else:
            await self.send_json({
                "text": {
                "type": "finished"
                }
            })

0
投票

请使用 await asyncio.sleep(5)

import asyncio
from channels.generic.websocket import AsyncJsonWebsocketConsumer


class TestController(AsyncJsonWebsocketConsumer):

    async def connect(self):
        await self.accept()

    async def receive_json(self, content: dict):
        for i in range(0, 5):
            await self.send_json({
                "text": f"Loop - {i + 1}"
            })
            await asyncio.sleep(5)

        await self.send_json({
            "text": {
                "type": "finished"
            }
        })

    async def disconnect(self, close_code):
        print()
        print("DISCONNECTED")
        print(close_code)
        print()

0
投票

是因为你使用了阻塞睡眠功能。相反,您应该使用异步 asyncio.sleep 函数,它不会阻塞事件循环。

例如

import asyncio
from channels.generic.websocket import AsyncJsonWebsocketConsumer

class TestController(AsyncJsonWebsocketConsumer):
    async def connect(self):
        await self.accept()

    async def receive_json(self, content: dict):
        for i in range(5):
            await self.send_json({"text": f"Loop - {i + 1}"})
            await asyncio.sleep(5)

        await self.send_json({"text": {"type": "finished"}})

    async def disconnect(self, close_code):
        print("DISCONNECTED")
        print(close_code)

那应该很完美。

© www.soinside.com 2019 - 2024. All rights reserved.