我是在 Python 中使用 asyncio 的新手,我无法弄清楚如何编写使用 python-websockets (和 asyncio)的测试。我想要一个连接到 websocket 并在连接后发送消息的客户端
这是我的客户类别:
import json
import websockets
class Client:
def __init__(self, interval):
self.interval = interval
async def connect(self):
async with websockets.connect("wss://test.com", ping_interval=self.interval) as websocket:
await self._send_message(websocket)
async def _send_message(self, websocket):
message = {"messageId": "499"}
await websocket.send(json.dumps(message))
这是我的测试
import json
import unittest
from unittest.mock import AsyncMock, patch
from client import Client
class ConnectionTest(unittest.IsolatedAsyncioTestCase):
async def test_message_is_sent(self):
websocket_client = Client(interval=4)
mock_websocket = AsyncMock()
with patch("client.websockets.connect", return_value=mock_websocket):
await websocket_client.connect()
message = {"messageId": "499"}
mock_websocket.send.assert_awaited_once_with(json.dumps(message))
我收到错误:
E AssertionError: Expected send to have been awaited once. Awaited 0 times.
因为这个上下文管理器正在被 AsyncMock() 替换,所以我假设我能够看到
send
是用 wait 调用的,然后能够对其进行断言。
我是否误解了它的用途?
看来您补丁错误。
尝试修补
websockets.connect
而不是 client.websockets.connect
:
with patch("websockets.connect", return_value=mock_websocket):