django 通道 WebsocketCommunicator 超时错误

问题描述 投票:0回答:3

我正在尝试运行以下测试:

测试.py

from rest_framework.test import APITestCase
from myapp.routing import application
from channels.testing import WebsocketCommunicator
from account.models import User
from rest_framework.authtoken.models import Token

class Tests(APITestCase):
    def setUp(self):
        self.user = User.objects.create(email='[email protected]', 
                                        password='a password')
        self.token, created = Token.objects.get_or_create(user=self.user)

    async def test_connect(self):
        communicator = WebsocketCommunicator(application, f"/ws/user/{self.token}/")
        connected, subprotocol = await communicator.connect()
        self.assertTrue(connected)
        await communicator.disconnect()

application
channels.routing.ProtocolTypeRouter
的样板实例(如下所示:https://channels.readthedocs.io/en/latest/topics/routing.html)。生产中一切正常。测试退出并出现以下错误:

Traceback (most recent call last):
  File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 74, in receive_output
    return await self.output_queue.get()
  File "/usr/lib/python3.7/asyncio/queues.py", line 159, in get
    await getter
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/sync.py", line 223, in __call__
    return call_result.result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/sync.py", line 292, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/home/projects/myapp/myapp-api/app/tests.py", line 35, in test_connect
    connected, subprotocol = await communicator.connect()
  File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/channels/testing/websocket.py", line 36, in connect
    response = await self.receive_output(timeout)
  File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 85, in receive_output
    raise e
  File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 74, in receive_output
    return await self.output_queue.get()
  File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/timeout.py", line 66, in __aexit__
    self._do_exit(exc_type)
  File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/timeout.py", line 103, in _do_exit
    raise asyncio.TimeoutError
concurrent.futures._base.TimeoutError

----------------------------------------------------------------------
Ran 1 test in 1.026s

我已经尝试使用通道 3.0.4 和 django 3.2.10 和channels-redis 3.3.1(

'BACKEND': 'channels_redis.core.RedisChannelLayer'
settings.py中)使用python版本3.7.5、3.8.0和3.9.9。错误仍然存在。我做错了什么?

django testing django-rest-framework django-channels
3个回答
0
投票

我也有同样的问题。 APITestCase 或 TestCase 不允许事务,您必须使用 django 测试中的 SimpleTestCase,并将数据库设置为全部。正是有了这个区别,我认为它会起作用。 请注意,交易将在测试之间保存,并且在测试后不会回滚。

from django.test import SimpleTestCase
from myapp.routing import application
from channels.testing import WebsocketCommunicator
from account.models import User
from rest_framework.authtoken.models import Token


class Tests(SimpleTestCase):
    databases = '__all__'
    def setUp(self):
        self.user = User.objects.create(email='[email protected]', password='a password')
        self.token, created = Token.objects.get_or_create(user=self.user)

    async def test_connect(self):
        communicator = WebsocketCommunicator(application, f"/ws/user/{self.token}/")
        connected, subprotocol = await communicator.connect()
        self.assertTrue(connected)
        await communicator.disconnect()

这里是SimpleTestCase的信息 https://docs.djangoproject.com/en/4.0/topics/testing/tools/


0
投票

我遇到了类似的问题,解决方案通常是模仿您的生产路由器进行测试,即在实例化您的 Communicator 时还应该添加生产中使用的任何中间件或附加组件。例如在我的 asgi.py 中我有:

application = ProtocolTypeRouter(
{
    "http": get_asgi_application(),
    "websocket": AllowedHostsOriginValidator(
        jwt_auth_middleware_stack(URLRouter(chat.routing.websocket_urlpatterns)),
    ),
}
)

我的通讯器实例化如下:

communicator = WebsocketCommunicator(jwt_auth_middleware_stack(URLRouter(websocket_urlpatterns)),
                                     f"/ws/chat/{chat.id}/?token={token}")

我的网址是:

websocket_urlpatterns = [
    path("ws/chat/<str:chat_id>/", consumers.AsyncChatConsumer.as_asgi())
]

0
投票

我遇到了同样的问题,后来意识到

receive_from
方法需要响应。因此,如果您的消费者在收到消息时没有回复任何内容,您将收到超时错误。您可以通过在
await self.send_json("ok")
方法末尾添加
receive_json
来测试这一点。

解决方案相当简单:

class MockConsumer(Consumer):

    @override
    async def receive_json(self, content: dict, **kwargs):
        await super().receive_json(content, **kwargs)
        await self.send_json(content)


@pytest.mark.asyncio
async def test_should_do_something(self):
    application = WebsocketAuthMiddlewareStack(
        URLRouter(
            [ re_path('ws/', MockConsumer.as_asgi()),]
        )
    )

使用这种结构,您将不会收到超时错误。 当然,这提出了另一个问题:也许您应该始终发送 ACK。 Websocket 是可靠的,因为它们使用 TCP,但如果服务器(或客户端)断开连接,您不会立即知道它,并且可能会丢失一些消息。

© www.soinside.com 2019 - 2024. All rights reserved.