Python 3.12 非阻塞 TCP 客户端

问题描述 投票:0回答:1

我正在尝试监听一个我无法控制的 TCP 服务器(它是一个本地 Windows 应用程序,在固定套接字上每 500 毫秒广播一次)。

我可以设置一个阅读器以在阻塞模式下获取随机长度的数据包

import asyncio, telnetlib3, time
TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000

async def get_one_line(reader):
    reply1 = []
    while True:
        c = await reader.read(1)
        if not c:
            break
        if c in ['\r', '\n']:
            break
        reply1.append(c)
    return reply1

async def main():
    reader, writer = await telnetlib3.open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
    writer.write("$SYS,INFO")
    count = 0
    while True:
        reply = []
        reply = await get_one_line(reader)
        if reply:
            print('reply:', ''.join(reply))
            del reply[:]
        print(count)
        count += 1
        if count > 10:
            break
        time.sleep(0.1)

asyncio.run(main())

并且它按预期每个 TCP 数据包打印一次“计数”(抱歉 XXX,不是我的数据)

reply: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
2
reply: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
3
reply: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
4
:
reply: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
10

当我尝试将

get_one_line()
包裹在异步函数中时,我将自己牢牢地包裹在轴上。如果我删除“10 后停止”,那么它只会打印计数而不接收任何内容。如果我在 10 点后退出,那么就会出现各种错误(我假设),因为任务仍在运行。

import asyncio, telnetlib3, time
TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000

async def get_one_line(reader):
    reply1 = []
    while True:
        c = await reader.read(1)
        if not c:
            break

        if c in ['\r', '\n']:
            break

        reply1.append(c)
    return reply1

async def get_nonblocking(reader, callback):
    reply2 = await get_one_line(reader)
    callback(reply2)

async def main():
    reader, writer = await telnetlib3.open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
    writer.write("$SYS,INFO")
    count = 0
    while True:
        reply = []
        getTask = asyncio.create_task(get_nonblocking(reader,reply.append))
        if reply:
            print('reply:', ''.join(reply))
            del reply[:]
        print(count)
        count += 1
        if count > 10:
            getTask.cancel()
            break
        time.sleep(0.1)

asyncio.run(main())

我认为我不知道如何使用 asyncio,并且 2 天的阅读没有帮助。 如有任何帮助,我们将不胜感激。

python python-asyncio tcpclient python-3.12
1个回答
0
投票

你说“我正在尝试完成它以在做其他事情的同时接收 TCP 数据包。”

看这个更简单的例子:

from random import random
from asyncio import sleep, get_event_loop, create_task


async def lines():
    # this is like your network code, yielding lines at somewhat unpredictable intervals
    n = 0
    while True:
        n += 1
        yield f'{n}\n'
        await sleep(random() * 3)


async def count_lines(reader):
    # this rountine receives lines, until it has 5 and then terminates
    count = 0
    async for line in reader:
        count += 1
        print(count, line, end='')
        if count >= 5:
            break


async def main():
    # set up the 'reader'
    reader = lines()
    # create a task for `count_lines`
    task = create_task(count_lines(reader))
    # while that task is going, do other stuff
    while not task.done():
        print('doing stuff in main...')
        await sleep(1)
    print('done!')


get_event_loop().run_until_complete(main())

输出示例:

doing stuff in main...
1 1
doing stuff in main...
doing stuff in main...
2 2
doing stuff in main...
3 3
doing stuff in main...
doing stuff in main...
doing stuff in main...
4 4
doing stuff in main...
5 5
done!

获取您的代码,并以类似的方式构建它:

from asyncio import sleep, get_event_loop, create_task
from telnetlib3 import open_connection

TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000


async def get_one_line(reader):
    # apparently, you want these lines as a list of characters
    reply1 = []
    while True:
        c = await reader.read(1)
        # this seems like a relevant case - isn't this the end?
        if not c:
            break
        if c in ['\r', '\n']:
            break
        reply1.append(c)
    return reply1


async def read_from(reader):
    count = 0
    while True:
        reply = []  # this is pointless, the next line overwrites it
        reply = await get_one_line(reader)
        if reply:
            print('reply:', ''.join(reply))
            del reply[:]  # also pointless, you're just going to overwrite it
        count += 1
        if count >= 10:
            break


async def main():
    reader, writer = await telnetlib3.open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
    writer.write("$SYS,INFO")  # supposedly, this kicks off the server - can't test
    task = create_task(read_from(reader))

    while not task.done():
        print('doing stuff in main...')
        await sleep(1)
        
    print('done!')


get_event_loop().run_until_complete(main())

我无法测试此代码,因为您没有为服务器提供代码,但它遵循相同的模式。

但是,这似乎是更简单的方法:

from asyncio import sleep, get_event_loop, create_task
from telnetlib3 import open_connection

TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000


async def get_one_line(reader):
    line = ''
    while True:
        c = await reader.read(1)
        if not c:
            break  # we're done
        line += c.decode()
        if c in ['\r', '\n']:
            yield line
            line = ''


async def read_from(reader):
    count = 0
    async for line in get_one_line(reader):
        print('reply:', line)
        # can still count, in case we want to stop after 10
        if (count := count + 1) >= 10:
            break


async def main():
    reader, writer = await telnetlib3.open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
    writer.write("$SYS,INFO")  # supposedly, this kicks off the server - can't test
    task = create_task(read_from(reader))

    while not task.done():
        print('doing stuff in main...')
        await sleep(1)

    print('done!')


get_event_loop().run_until_complete(main())

它应该在收到 10 行后停止,或者如果服务器不再响应,无论先发生什么。两者都不需要

while True
无休止的循环 - 只要我们允许,
get_one_line()
生成器就会不断生成线条。并且
count_lines
将继续从中读取数据,直到计数器到期。如果您删除了计数器,它将永远运行直到任务停止。

同时,

main()
可以做任何事情,只要循环中某处足够频繁地有await,以便控制权可以传递给读者。

© www.soinside.com 2019 - 2024. All rights reserved.