我正在编写一个工具来端到端测试基于异步的服务器。最初,我打算在一个终端窗口中启动服务器,并在一个单独的窗口中运行测试,但后来我意识到应该可以在一个脚本中执行此操作。毕竟,我可以用
concurrent.futures.ThreadPoolExecutor
来完成,但我很难使用 await
/async def
来转换逻辑。
这是一个使用 TPE 的工作示例:
import argparse
import socket
import concurrent.futures
import threading
import socketserver
class TCPHandler(socketserver.BaseRequestHandler):
def handle(self):
print(f'Got data: {self.request.recv(1024).strip().decode()}')
def started_server(*, server):
print('starting server')
server.serve_forever()
print('server thread closing')
def run_client(*, host, port, server):
print('client started, attempting connection')
with socket.create_connection((host, port), timeout=0.5) as conn:
print('connected')
conn.send(b'hello werld')
print('closing server')
server.shutdown()
print('cancelled')
def test_the_server(*, host, port):
ex = concurrent.futures.ThreadPoolExecutor(max_workers=3)
print('server a')
quitter = threading.Event()
server = socketserver.TCPServer((host, port), TCPHandler)
a = ex.submit(started_server, server=server)
b = ex.submit(run_client, host=host, port=port, server=server)
print(a.result(), b.result())
print('server b')
def do_it(): # Shia LeBeouf!
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("-p", "--port", type=int, default=60025)
args = parser.parse_args()
exit(test_the_server(host=args.host, port=args.port))
if __name__ == "__main__":
do_it()
我如何将其转换为使用异步循环?我很确定我需要在线程中生成服务器异步循环,但到目前为止它只是阻塞,并且有关 SO 的其他问题未能提供解决方案(或已过时)。
这是一个对我来说失败的例子:
import asyncio
import argparse
import socket
import concurrent.futures
import threading
import socketserver
class EchoHandler(asyncio.Protocol):
def data_received(self, data):
print(f"Got this data: {data.decode()}")
async def run_server(*, server):
print('starting server')
server = await server
async with server:
print('start serving')
await server.start_serving()
print('waiting on close')
await server.wait_closed()
print('server coro closing')
def started_server(*, server):
print('server thread started')
asyncio.run(run_server(server=server))
print('server thread finished')
def run_client(*, host, port, server):
print('client started, attempting connection')
with socket.create_connection((host, port), timeout=0.5) as conn:
print('connected')
conn.send(b'hello werld')
print('closing server')
server.close()
print('cancelled')
async def fnord(reader, writer):
data = await reader.read(100)
message = data.decode()
print('got', message)
def test_the_server(*, host, port):
ex = concurrent.futures.ThreadPoolExecutor(max_workers=3)
print('server a')
quitter = threading.Event()
#server = socketserver.TCPServer((host, port), TCPHandler)
server = asyncio.start_server(fnord, host, port)
a = ex.submit(started_server, server=server)
b = ex.submit(run_client, host=host, port=port, server=server)
print(a.result(), b.result())
print('server b')
def do_it(): # Shia LeBeouf!
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("-p", "--port", type=int, default=60025)
args = parser.parse_args()
exit(test_the_server(host=args.host, port=args.port))
if __name__ == "__main__":
do_it()
我希望 https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server.wait_close 就足够了,当我在另一个线程上调用
server.close()
时,它会关闭服务器,但似乎并非如此。 serve_forever
的行为与 start_serving
方法相同。
正如我在评论中提到的,我不太确定你在寻找什么,但我想我会想出一个例子,其中你在同一个中运行 TCP 服务器和客户端
asyncio
事件循环。我扩充了 asyncio
流 的示例来制作此示例。
import asyncio
async def test_client(message):
reader, writer = await asyncio.open_connection("127.0.0.1", 9000)
writer.write(f"{message}\n".encode())
await writer.drain()
print(f"The test client send the message: {message}")
data = await reader.readline()
response = data.decode().strip()
print(f"The test client received the response: {response}")
writer.close()
await writer.wait_closed()
async def handle_echo(reader, writer):
data = await reader.readline()
message = data.decode().strip()
print(f"Server received message: {message}")
writer.write(f"{message}\n".encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def server():
server = await asyncio.start_server(handle_echo, "127.0.0.1", 9000)
async with server:
await server.serve_forever()
async def main():
await asyncio.gather(
server(),
test_client("This is only a test"),
)
asyncio.run(main())
这个例子并不是一个很好的脚本,因为服务器将永远运行直到
ctrl + c
。
这是一个直接使用任务的示例(请注意,
asyncio.gather
将传递给它的协程转换为任务),如果您有对任务的引用,则可以取消该任务。除了更改为 main
和额外的协程之外,一切都是一样的。
async def cancel_task_after(task: asyncio.Task, seconds_to_wait: int) -> None:
await asyncio.sleep(seconds_to_wait)
task.cancel()
return None
async def main():
async with asyncio.TaskGroup() as task_group:
server_task = task_group.create_task(server())
client_task = task_group.create_task(test_client("This is only a test"))
cancel_server_task = task_group.create_task(cancel_task_after(server_task, 3))
asyncio.run(main())