如何解决 asyncio.StreamReader 引发 OpenSSL.SSL.WantReadError

问题描述 投票:0回答:1

当使用

OpenSSL.SSL.WantReadError
执行异步 TCP 请求时,我无法从
asyncio.StreamReader
恢复。这是我的设置并尝试解决问题。

我使用以下调用来包装先前创建的套接字来创建读取器和写入器。

self.reader, self.writer = await asyncio.open_connection(sock=self.sock)

在本例中它是一个 TLS 套接字。

我最初有一些看起来像这样的代码来从套接字读取。

async def _receive_data(reader: asyncio.StreamReader, sz):
    pos = 0
    data = None
    while pos < sz:
        chunk = await reader.read(sz - pos)
        if pos == 0:
            data = chunk
        else:
            data += chunk
        pos += len(chunk)

    return data

当我尝试从服务器读取较大的响应(~300KB)时,我开始收到

OpenSSL.SSL.WantReadError
。我认为服务器还需要一些时间来制定这个特定的响应,因此该错误是有意义的。我遇到的问题是,我该如何恢复?

我尝试将我的

_receive_data
功能更改为

async def _receive_data(reader: asyncio.StreamReader, sz):
    pos = 0
    data = b""

    while pos < sz:
        try:
            chunk = await reader.read(sz - pos)
        except SSL.WantReadError:
            print(
                f"SSL.WantReadError: Waiting for more data. Received {pos} bytes of {sz} bytes."
            )
            print("reader._exception: ", repr(reader))
            reader.set_exception(None)
            await asyncio.sleep(0.1)
            continue

        if pos == 0:
            data = chunk
        else:
            data += chunk

        pos += len(chunk)

    return data

我从打印语句中得到一些输出,如下所示

SSL.WantReadError: Waiting for more data. Received 0 bytes of 8 bytes.
reader._exception: <StreamReader exception=WantReadError() transport=<_SelectorSocketTransport closed fd=15>>
SSL.WantReadError: Waiting for more data. Received 163832 bytes of 303525 bytes.
reader._exception: <StreamReader exception=WantReadError() transport=<_SelectorSocketTransport closed fd=17>>

它只打印输出两次然后挂起。我假设这是因为读取器对象上的套接字设置为 None,如

<_SelectorSocketTransport closed fd=17>>
所示。

WantReadError
的文档说明

操作未完成;稍后应再次调用相同的 I/O 方法,并使用相同的 论据。任何 I/O 方法都可能导致这种情况,因为新的握手可能随时发生。

想要读取的是通过网络发送的脏数据,而不是隧道内的干净数据。对于基于套接字的 SSL 连接,读取意味着数据通过网络传入我们。在读取成功之前,尝试的 OpenSSL.SSL.Connection.recv()、OpenSSL.SSL.Connection.send() 或 OpenSSL.SSL.Connection.do_handshake() 会被阻止或不完整。您可能想在重试之前在套接字上 select() 。

我不确定如何将该信息应用于 asyncio.Streams 的情况。理想情况下,我能够在包装的套接字上发出相同的请求,但这是通过 Streams 抽象出来的。也许解决方案是以某种方式存储失败的请求,即

sz - pos
,使用相同的套接字创建一个新的读取器/写入器,然后再次发出
sz-pos
请求?这一切都感觉太混乱了,我想假设有一种更干净的方法来处理这种情况

编辑:

我尝试了我在上一段中提到的方法,但没有成功。如果我尝试使用相同的袜子重新创建流,当我调用 asyncio.open_connection()` 时,我会得到

[Errno 9] Bad file descriptor
。我现在看到的唯一行动方案是使用不同的套接字,但这意味着我需要请求相同的信息,直到成功为止。鉴于服务器需要一些时间来制定响应,这并不理想

python python-3.x python-asyncio pyopenssl
1个回答
0
投票

我最终找出了 TLS + asyncio 遇到非典型问题的原因。我还找到了一个适合我的情况、破坏性最小的非理想解决方案。

问题是 asyncio 不能与 pyOpenSSL 模块的 OpenSSL.SSL.Context 一起开箱即用。 asyncio 旨在与内置 ssl 模块 ssl.SSLContext 一起使用。相似的上下文对象名称让我感到困惑。我想如果我使用了 ssl.SSLContext ssl.SSLWantReadError (而不是 OpenSSL.SSL.WantReadError :))将会得到正确处理,尽管我没有在 asyncio 中找到明确处理此错误的任何地方。然而,我确实看到 asyncio 适当地处理了异常 BlockingIOError,因此我决定对我的套接字进行子类化并返回适当的错误。

class _AsyncioSSLConnectionAdapter(SSL.Connection):
    def recv(self, bufsiz: int, flags: int | None = None) -> bytes:
        """Custom wrapper around SSL.Connection.recv that raises a BlockingIOError instead of SSL.WantReadError
        since pyOpenSSL.SSL.Connection (basically a socket) is not compatible with asyncio streams and raises SSL.WantReadError
        in cases when no data is available to read. If we raise SSL.WantReadError asyncio will close the underlying socket
        and not allow retries. By raising BlockingIOError asyncio will retry the read operation.
        """
        try:
            ssl.SSLWantReadError 
            return super().recv(bufsiz, flags)
        except SSL.WantReadError:
            raise BlockingIOError("Wrapped SSL.WantReadError")

这是我能找到的侵入性最小的解决方案,无需用 ssl.SSLContext 替换对 OpenSSL.SSL.Context 的使用,由于 pyOpenSSL 的添加功能(作为

OpenSSL
导入),这不适用于我的用例。

我发现的另一个解决方案是创建我自己的使用

Transport
OpenSSL.SSL.Context
类,但这也需要更多的工作才能正确实现。有一个名为 aioopenssl 的项目可以做到这一点,但它似乎不够成熟且维护不够积极,不足以让我将其作为新的依赖项引入。

© www.soinside.com 2019 - 2024. All rights reserved.