我有一个用例,其中有很多 TCP/IP 客户端被捆绑到一个 Python 进程中。目前的愿望是使用
asyncio
来为程序提供并发性。对于所有新 TCP/IP 客户端,将使用 asyncio
流,对于所有新 HTTP 客户端,将使用 aiohttp
。
socket
模块编写的。我的问题是:您应该如何使用 socket
“包装”现有的基于 async
的类和方法?
现有 TCP/IP 客户端示例:
import socket
class ExistingClient:
def __init__(self, host: str, port: int) -> None:
self.__host = host
self.__port = port
self.__socket: socket.socket | None = None
def initialize(self) -> None:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.connect((self.__host, self.__port))
def get_status(self) -> int:
self.__socket.send("2\n".encode())
data: str = str(self.__socket.recv(1024).decode()).strip()
return int(data)
def close(self) -> None:
self.__socket.close()
您可以运行此服务器脚本作为以下客户端脚本的示例。保存在
server.py
。
import asyncio
import functools
class State:
def __init__(self) -> None:
self.__count: int = 0
def add_to_count(self, count: int) -> None:
self.__count = self.__count + count
@property
def count(self) -> int:
return self.__count
async def handle_echo(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter, state: State
):
# Ignore the use of the infinite while loop for this example.
# Controlling the loop would be handled in a more sophisticated way.
while True:
data = await reader.readline()
message: int = int(data.decode())
state.add_to_count(message)
addr = writer.get_extra_info("peername")
writer.write(f"{state.count}\n".encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def server(port: int):
state = State()
partial = functools.partial(handle_echo, state=state)
server = await asyncio.start_server(partial, "127.0.0.1", port)
address = ", ".join(str(sock.getsockname()) for sock in server.sockets)
print(f"Serving on {address}")
async with server:
await server.serve_forever()
async def main():
await asyncio.gather(
server(8888),
server(8889),
)
asyncio.run(main())
对我来说,一种解决方案似乎是现有的类可以使用
async
方法进行编辑,这些方法只是推迟到阻塞 I/O socket
调用。例如:
import socket
class ExistingClient:
def __init__(self, host: str, port: int) -> None:
self.__host = host
self.__port = port
self.__socket: socket.socket | None = None
def initialize(self) -> None:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.connect((self.__host, self.__port))
async def async_initialize(self) -> None:
self.initialize()
def get_status(self) -> int:
self.__socket.send("2\n".encode())
data: str = str(self.__socket.recv(1024).decode()).strip()
return int(data)
async def async_get_status(self) -> int:
return self.get_status()
def close(self) -> None:
self.__socket.close()
async def async_close(self) -> None:
self.close()
这是否在不阻塞
asyncio
事件循环的意义上起作用,使其行为与 asyncio
流调用非常相似?举个例子,我设想这样的东西如何与正常的 asyncio
代码集成(保存在与上面直接修改的 ExistingClient
与 async
中的 client.py
方法相同的文件中):
import asyncio
async def streams_tcp_client():
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
writer.write("1\n".encode())
await writer.drain()
data: bytes = await reader.readline()
print(f"asyncio streams data: {data.decode().strip()}")
writer.close()
await writer.wait_closed()
async def existing_tcp_client1():
existing_client = ExistingClient("127.0.0.1", 8889)
await existing_client.async_initialize()
data: int = await existing_client.async_get_status()
print(f"Existing client data: {data}")
await existing_client.async_close()
async def main():
await asyncio.gather(
streams_tcp_client(),
existing_tcp_client1(),
)
这是否如人们所期望的那样工作,使得包含阻塞 I/O 调用的
ExistingClient
async
调用,不阻塞asyncio
事件循环?
我已经运行了这段代码,它运行并打印出预期的数据。但尚不清楚如何测试事件循环是否按预期或期望运行。
asyncio.to_thread
。在文档中,它指出:
此协程函数主要用于执行 IO 绑定函数/方法,否则如果它们在主线程中运行,则会阻塞事件循环。
然而,这并不能真正解释任何事情。它并没有解释为什么简单地定义
async def async_blocking_io():
blocking_io()
不足以不阻塞事件循环。因为实际的阻塞 I/O(例如 TCP/IP 写入和读取操作)不应阻塞事件循环的线程,对吗?
使用方法是否如下:
async def existing_tcp_client2():
existing_client = ExistingClient("127.0.0.1", 8889)
await asyncio.to_thread(existing_client.initialize)
data: int = await asyncio.to_thread(existing_client.get_status)
print(f"Existing client data with to_thread: {data}")
await asyncio.to_thread(existing_client.close)
如果我将
main
修改为:,则会运行:
async def main():
await asyncio.gather(
streams_tcp_client(),
existing_tcp_client1(),
)
制作
async def async_<existing_method>
方法与 asyncio.to_thread
方法有什么区别? asyncio.to_thread
方法令人担忧,因为每个调用都会在一个新线程中运行?对于线程不安全的类来说,这可能是一个问题,并且还会通过不断生成新线程来产生开销。
这个问题还有哪些其他解决方案?
解决方案 1 - 只需使用
async
包装器包装同步函数 - 仍然会阻塞您的主进程。在 TCP 服务器/客户端中,您可以尝试通过重写向套接字发送数据或从套接字读取数据的较低级别函数来解决此问题,使这些较低级别的函数也是异步的,以较小的块处理通信。但如果您不这样做,那么解决方案 1 无法为您提供更多并发性。
如果你简单地定义
async def async_blocking_io():
blocking_io()
那么这将阻止该过程。即使您添加如下正式的等待语句:
async def async_blocking_io():
await asyncio.sleep(0) # first gives other tasks a chance
blocking_io()
现在其他协同例程可能会先运行,但您仍然会阻塞同步
blocking_io()
调用,因为底层 TCP 读写操作也是同步的并且会阻塞(如果您将套接字置于非阻塞模式) ,您需要阻止选择调用才能知道数据何时可用)。
asyncio.to_thread
函数确实在单独的线程中运行,因为这是防止阻塞的唯一方法(当使用简单的高级包装器时)。所以,如果你经常调用这个,你确实会有很多开销,这可能会违背使用 asyncio 的目的。
没有简单的出路。要么选择
asyncio.to_thread
的便利性(有一定的开销),要么使用 asyncio.streams
完全重写所有代码,包括现有的客户端,或者半途而废,使现有的高级同步代码异步(但这只有在您还可以使它们的最低级别功能 - 与套接字通信 - 异步时才会有帮助)。