无法使websockets测试通过Django通道

问题描述 投票:0回答:2

给出如下所示的Django Channels使用者:

class NotificationConsumer(JsonWebsocketConsumer):
    def connect(self):
        user = self.scope["user"]
        async_to_sync(self.channel_layer.group_add)("hello", "hello")
        self.accept()
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )

    def receive_json(self, content, **kwargs):
        print(content)
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )
        print("Here we are")

    def chat_message(self, event):
        self.send_json(content=event["content"])

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)("hello", "hello")

以及如下所示的测试:

@pytest.mark.asyncio
class TestWebsockets:

    async def test_receives_data(self, settings):

        communicator = WebsocketCommunicator(
            application=application, path="/ws/notifications/"
        )
        connected, _ = await communicator.connect()
        assert connected
        await communicator.send_json_to({"type": "notify", "data": "who knows"})
        response = await communicator.receive_json_from()
        await communicator.disconnect()

运行测试时,我总是得到TimeoutError。我需要做些什么?

[如果您想查看完整的回购示例,请查看https://github.com/phildini/websockets-test

django django-channels
2个回答
0
投票
© www.soinside.com 2019 - 2024. All rights reserved.