如何将 Python 多线程套接字客户端/服务器转换为 asyncio?

问题描述 投票:0回答:1

我正在编写一个工具来端到端测试基于异步的服务器。最初,我打算在一个终端窗口中启动服务器,并在一个单独的窗口中运行测试,但后来我意识到应该可以在一个脚本中执行此操作。毕竟,我可以用

concurrent.futures.ThreadPoolExecutor
来完成,但我很难使用
await
/
async def
来转换逻辑。

这是一个使用 TPE 的工作示例:

import argparse
import socket
import concurrent.futures
import threading
import socketserver


class TCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print(f'Got data: {self.request.recv(1024).strip().decode()}')

def started_server(*, server):
    print('starting server')
    server.serve_forever()
    print('server thread closing')


def run_client(*, host, port, server):
    print('client started, attempting connection')
    with socket.create_connection((host, port), timeout=0.5) as conn:
        print('connected')
        conn.send(b'hello werld')
    print('closing server')
    server.shutdown()
    print('cancelled')

def test_the_server(*, host, port):
    ex = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    print('server a')
    quitter = threading.Event()
    server = socketserver.TCPServer((host, port), TCPHandler)
    a = ex.submit(started_server, server=server)
    b = ex.submit(run_client, host=host, port=port, server=server)
    print(a.result(), b.result())
    print('server b')


def do_it():  # Shia LeBeouf!
    parser = argparse.ArgumentParser(usage=__doc__)
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("-p", "--port", type=int, default=60025)
    args = parser.parse_args()

    exit(test_the_server(host=args.host, port=args.port))


if __name__ == "__main__":
    do_it()

我如何将其转换为使用异步循环?我很确定我需要在线程中生成服务器异步循环,但到目前为止它只是阻塞,并且有关 SO 的其他问题未能提供解决方案(或已过时)。

这是一个对我来说失败的例子:

import asyncio
import argparse
import socket
import concurrent.futures
import threading
import socketserver


class EchoHandler(asyncio.Protocol):
    def data_received(self, data):
        print(f"Got this data: {data.decode()}")

async def run_server(*, server):
    print('starting server')
    server = await server
    async with server:
        print('start serving')
        await server.start_serving()
        print('waiting on close')
        await server.wait_closed()
    print('server coro closing')

def started_server(*, server):
    print('server thread started')
    asyncio.run(run_server(server=server))
    print('server thread finished')

def run_client(*, host, port, server):
    print('client started, attempting connection')
    with socket.create_connection((host, port), timeout=0.5) as conn:
        print('connected')
        conn.send(b'hello werld')
    print('closing server')
    server.close()
    print('cancelled')

async def fnord(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    print('got', message)

def test_the_server(*, host, port):
    ex = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    print('server a')
    quitter = threading.Event()
    #server = socketserver.TCPServer((host, port), TCPHandler)
    server = asyncio.start_server(fnord, host, port)
    a = ex.submit(started_server, server=server)
    b = ex.submit(run_client, host=host, port=port, server=server)
    print(a.result(), b.result())
    print('server b')


def do_it():  # Shia LeBeouf!
    parser = argparse.ArgumentParser(usage=__doc__)
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("-p", "--port", type=int, default=60025)
    args = parser.parse_args()

    exit(test_the_server(host=args.host, port=args.port))


if __name__ == "__main__":
    do_it()

我希望 https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server.wait_close 就足够了,当我在另一个线程上调用

server.close()
时,它会关闭服务器,但似乎并非如此。
serve_forever
的行为与
start_serving
方法相同。

python multithreading async-await python-asyncio
1个回答
0
投票

正如我在评论中提到的,我不太确定你在寻找什么,但我想我会想出一个例子,其中你在同一个中运行 TCP 服务器和客户端

asyncio
事件循环。我扩充了
asyncio
的示例来制作此示例。

import asyncio


async def test_client(message):
    reader, writer = await asyncio.open_connection("127.0.0.1", 9000)

    writer.write(f"{message}\n".encode())
    await writer.drain()
    print(f"The test client send the message: {message}")

    data = await reader.readline()
    response = data.decode().strip()
    print(f"The test client received the response: {response}")

    writer.close()
    await writer.wait_closed()


async def handle_echo(reader, writer):
    data = await reader.readline()
    message = data.decode().strip()
    print(f"Server received message: {message}")

    writer.write(f"{message}\n".encode())
    await writer.drain()

    writer.close()
    await writer.wait_closed()


async def server():
    server = await asyncio.start_server(handle_echo, "127.0.0.1", 9000)

    async with server:
        await server.serve_forever()


async def main():
    await asyncio.gather(
        server(),
        test_client("This is only a test"),
    )


asyncio.run(main())

这个例子并不是一个很好的脚本,因为服务器将永远运行直到

ctrl + c

这是一个直接使用任务的示例(请注意,

asyncio.gather
将传递给它的协程转换为任务),如果您有对任务的引用,则可以取消该任务。除了更改为
main
和额外的协程之外,一切都是一样的。

async def cancel_task_after(task: asyncio.Task, seconds_to_wait: int) -> None:
    await asyncio.sleep(seconds_to_wait)
    task.cancel()
    return None


async def main():
    async with asyncio.TaskGroup() as task_group:
        server_task = task_group.create_task(server())
        client_task = task_group.create_task(test_client("This is only a test"))
        cancel_server_task = task_group.create_task(cancel_task_after(server_task, 3))


asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.