我正在尝试运行以下测试:
测试.py
from rest_framework.test import APITestCase
from myapp.routing import application
from channels.testing import WebsocketCommunicator
from account.models import User
from rest_framework.authtoken.models import Token
class Tests(APITestCase):
def setUp(self):
self.user = User.objects.create(email='[email protected]',
password='a password')
self.token, created = Token.objects.get_or_create(user=self.user)
async def test_connect(self):
communicator = WebsocketCommunicator(application, f"/ws/user/{self.token}/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
application
是 channels.routing.ProtocolTypeRouter
的样板实例(如下所示:https://channels.readthedocs.io/en/latest/topics/routing.html)。生产中一切正常。测试退出并出现以下错误:
Traceback (most recent call last):
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 74, in receive_output
return await self.output_queue.get()
File "/usr/lib/python3.7/asyncio/queues.py", line 159, in get
await getter
concurrent.futures._base.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/sync.py", line 223, in __call__
return call_result.result()
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
return self.__get_result()
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/sync.py", line 292, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/home/projects/myapp/myapp-api/app/tests.py", line 35, in test_connect
connected, subprotocol = await communicator.connect()
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/channels/testing/websocket.py", line 36, in connect
response = await self.receive_output(timeout)
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 85, in receive_output
raise e
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 74, in receive_output
return await self.output_queue.get()
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/timeout.py", line 66, in __aexit__
self._do_exit(exc_type)
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/timeout.py", line 103, in _do_exit
raise asyncio.TimeoutError
concurrent.futures._base.TimeoutError
----------------------------------------------------------------------
Ran 1 test in 1.026s
我已经尝试使用通道 3.0.4 和 django 3.2.10 和channels-redis 3.3.1(
'BACKEND': 'channels_redis.core.RedisChannelLayer'
在settings.py中)使用python版本3.7.5、3.8.0和3.9.9。错误仍然存在。我做错了什么?
我也有同样的问题。 APITestCase 或 TestCase 不允许事务,您必须使用 django 测试中的 SimpleTestCase,并将数据库设置为全部。正是有了这个区别,我认为它会起作用。 请注意,交易将在测试之间保存,并且在测试后不会回滚。
from django.test import SimpleTestCase
from myapp.routing import application
from channels.testing import WebsocketCommunicator
from account.models import User
from rest_framework.authtoken.models import Token
class Tests(SimpleTestCase):
databases = '__all__'
def setUp(self):
self.user = User.objects.create(email='[email protected]', password='a password')
self.token, created = Token.objects.get_or_create(user=self.user)
async def test_connect(self):
communicator = WebsocketCommunicator(application, f"/ws/user/{self.token}/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
这里是SimpleTestCase的信息 https://docs.djangoproject.com/en/4.0/topics/testing/tools/
我遇到了类似的问题,解决方案通常是模仿您的生产路由器进行测试,即在实例化您的 Communicator 时还应该添加生产中使用的任何中间件或附加组件。例如在我的 asgi.py 中我有:
application = ProtocolTypeRouter(
{
"http": get_asgi_application(),
"websocket": AllowedHostsOriginValidator(
jwt_auth_middleware_stack(URLRouter(chat.routing.websocket_urlpatterns)),
),
}
)
我的通讯器实例化如下:
communicator = WebsocketCommunicator(jwt_auth_middleware_stack(URLRouter(websocket_urlpatterns)),
f"/ws/chat/{chat.id}/?token={token}")
我的网址是:
websocket_urlpatterns = [
path("ws/chat/<str:chat_id>/", consumers.AsyncChatConsumer.as_asgi())
]
我遇到了同样的问题,后来意识到
receive_from
方法需要响应。因此,如果您的消费者在收到消息时没有回复任何内容,您将收到超时错误。您可以通过在 await self.send_json("ok")
方法末尾添加 receive_json
来测试这一点。
解决方案相当简单:
class MockConsumer(Consumer):
@override
async def receive_json(self, content: dict, **kwargs):
await super().receive_json(content, **kwargs)
await self.send_json(content)
@pytest.mark.asyncio
async def test_should_do_something(self):
application = WebsocketAuthMiddlewareStack(
URLRouter(
[ re_path('ws/', MockConsumer.as_asgi()),]
)
)
使用这种结构,您将不会收到超时错误。 当然,这提出了另一个问题:也许您应该始终发送 ACK。 Websocket 是可靠的,因为它们使用 TCP,但如果服务器(或客户端)断开连接,您不会立即知道它,并且可能会丢失一些消息。