我正在尝试监听一个我无法控制的 TCP 服务器(它是一个本地 Windows 应用程序,在固定套接字上每 500 毫秒广播一次)。
我可以设置一个阅读器以在阻塞模式下获取随机长度的数据包
import asyncio, telnetlib3, time
TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000
async def get_one_line(reader):
reply1 = []
while True:
c = await reader.read(1)
if not c:
break
if c in ['\r', '\n']:
break
reply1.append(c)
return reply1
async def main():
reader, writer = await telnetlib3.open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
writer.write("$SYS,INFO")
count = 0
while True:
reply = []
reply = await get_one_line(reader)
if reply:
print('reply:', ''.join(reply))
del reply[:]
print(count)
count += 1
if count > 10:
break
time.sleep(0.1)
asyncio.run(main())
并且它按预期每个 TCP 数据包打印一次“计数”(抱歉 XXX,不是我的数据)
reply: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
2
reply: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
3
reply: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
4
:
reply: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
10
当我尝试将
get_one_line()
包裹在异步函数中时,我将自己牢牢地包裹在轴上。如果我删除“10 后停止”,那么它只会打印计数而不接收任何内容。如果我在 10 点后退出,那么就会出现各种错误(我假设),因为任务仍在运行。
import asyncio, telnetlib3, time
TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000
async def get_one_line(reader):
reply1 = []
while True:
c = await reader.read(1)
if not c:
break
if c in ['\r', '\n']:
break
reply1.append(c)
return reply1
async def get_nonblocking(reader, callback):
reply2 = await get_one_line(reader)
callback(reply2)
async def main():
reader, writer = await telnetlib3.open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
writer.write("$SYS,INFO")
count = 0
while True:
reply = []
getTask = asyncio.create_task(get_nonblocking(reader,reply.append))
if reply:
print('reply:', ''.join(reply))
del reply[:]
print(count)
count += 1
if count > 10:
getTask.cancel()
break
time.sleep(0.1)
asyncio.run(main())
我认为我不知道如何使用 asyncio,并且 2 天的阅读没有帮助。 如有任何帮助,我们将不胜感激。
你说“我正在尝试完成它以在做其他事情的同时接收 TCP 数据包。”
看这个更简单的例子:
from random import random
from asyncio import sleep, get_event_loop, create_task
async def lines():
# this is like your network code, yielding lines at somewhat unpredictable intervals
n = 0
while True:
n += 1
yield f'{n}\n'
await sleep(random() * 3)
async def count_lines(reader):
# this rountine receives lines, until it has 5 and then terminates
count = 0
async for line in reader:
count += 1
print(count, line, end='')
if count >= 5:
break
async def main():
# set up the 'reader'
reader = lines()
# create a task for `count_lines`
task = create_task(count_lines(reader))
# while that task is going, do other stuff
while not task.done():
print('doing stuff in main...')
await sleep(1)
print('done!')
get_event_loop().run_until_complete(main())
输出示例:
doing stuff in main...
1 1
doing stuff in main...
doing stuff in main...
2 2
doing stuff in main...
3 3
doing stuff in main...
doing stuff in main...
doing stuff in main...
4 4
doing stuff in main...
5 5
done!
获取您的代码,并以类似的方式构建它:
from asyncio import sleep, get_event_loop, create_task
from telnetlib3 import open_connection
TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000
async def get_one_line(reader):
# apparently, you want these lines as a list of characters
reply1 = []
while True:
c = await reader.read(1)
# this seems like a relevant case - isn't this the end?
if not c:
break
if c in ['\r', '\n']:
break
reply1.append(c)
return reply1
async def read_from(reader):
count = 0
while True:
reply = [] # this is pointless, the next line overwrites it
reply = await get_one_line(reader)
if reply:
print('reply:', ''.join(reply))
del reply[:] # also pointless, you're just going to overwrite it
count += 1
if count >= 10:
break
async def main():
reader, writer = await telnetlib3.open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
writer.write("$SYS,INFO") # supposedly, this kicks off the server - can't test
task = create_task(read_from(reader))
while not task.done():
print('doing stuff in main...')
await sleep(1)
print('done!')
get_event_loop().run_until_complete(main())
我无法测试此代码,因为您没有为服务器提供代码,但它遵循相同的模式。
但是,这似乎是更简单的方法:
from asyncio import sleep, get_event_loop, create_task
from telnetlib3 import open_connection
TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000
async def get_one_line(reader):
line = ''
while True:
c = await reader.read(1)
if not c:
break # we're done
line += c.decode()
if c in ['\r', '\n']:
yield line
line = ''
async def read_from(reader):
count = 0
async for line in get_one_line(reader):
print('reply:', line)
# can still count, in case we want to stop after 10
if (count := count + 1) >= 10:
break
async def main():
reader, writer = await telnetlib3.open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
writer.write("$SYS,INFO") # supposedly, this kicks off the server - can't test
task = create_task(read_from(reader))
while not task.done():
print('doing stuff in main...')
await sleep(1)
print('done!')
get_event_loop().run_until_complete(main())
它应该在收到 10 行后停止,或者如果服务器不再响应,无论先发生什么。两者都不需要
while True
无休止的循环 - 只要我们允许,get_one_line()
生成器就会不断生成线条。并且 count_lines
将继续从中读取数据,直到计数器到期。如果您删除了计数器,它将永远运行直到任务停止。
同时,
main()
可以做任何事情,只要循环中某处足够频繁地有await,以便控制权可以传递给读者。