我正在使用 telnetlib3 和 asyncio 模块在 Python 3 中创建一个 telnet 客户端。更具体地说,telnet 客户端是与 VLC Media Player 的 telnet 接口连接的。我已经能够成功实现客户端并建立连接并通过 telnet 发出命令。客户端提示用户输入,然后将其写入主机。
我的问题: 如果在任何情况下远程主机(VLC 窗口)关闭,则客户端将继续等待输入而不是自动终止。我想在解释器等待通过
writer.is_closing()
函数输入时同时监视连接,如果它返回 true,那么我希望它退出或终止。
我的方法:我知道一种方法是在并行线程上运行监视器函数,它可以监视连接并退出脚本,但我也知道 asyncio 也可以执行并发任务,正如官方线程模块文档也建议的那样因此,为了使用asyncio,我想通过使用asyncio来实现监控功能和客户端,并减少库的导入。
我的代码:
import telnetlib3
import asyncio
import json
from helpers import LoopBreakException, send_password
# THIS IS THE FUNCTION THAT I WANT TO BE CONCURRENT
async def monitor_connection():
while True:
await asyncio.sleep(3) # Adjust the interval as needed
if writer.is_closing():
raise LoopBreakException(
"Unexpected termination of VLC. Connection lost. Exiting...")
async def connection():
global reader, writer
try:
with open("config.json") as config:
try:
configuration = json.loads(config.read())
# Telnet host, port and password
host, port, key, token = configuration["host"], configuration[
"port"], configuration['key'].encode("utf-8"), configuration['token'].encode("utf-8")
except json.JSONDecodeError as JDE:
logger.error(JDE)
return 0
except FileNotFoundError as FNFE:
logger.error(FNFE)
return 0
# Connect to the Telnet host
try:
reader, writer = await telnetlib3.open_connection(host, port)
except ConnectionRefusedError as CRE:
logger.error(CRE)
return 0
# Login to interface
print(await reader.readuntil(b"Password:"))
writer.write(send_password(key, token)+"\n")
# I KNOW THIS IS ONE WAY TO CREATE A TASK BUT
# I AM NOT ABLE TO FIGURE OUT WHERE TO AWAIT IT OR ANY OTHER WAY TO IMPLEMENT THIS
task = asyncio.create_task(monitor_connection())
del key
del token
await reader.readuntil(b">")
async def Send_command_Get_output(command):
if command == "quit":
writer.write(command + "\n")
print(await (reader.readuntil()))
writer.close()
raise LoopBreakException(
"Quit command sent to VLC. Closing connection.")
elif command == "shutdown":
writer.write(command + "\n")
try:
print(await (reader.readuntil(b"Shutting down.")))
except asyncio.IncompleteReadError as e:
print("Try failed. Doing partial read")
print(e.partial)
finally:
writer.close()
raise LoopBreakException("Shutdown completed")
writer.write(command + "\n")
try:
buffer = await reader.readuntil(b">")
except asyncio.IncompleteReadError as e:
buffer = e.partial
buffer = buffer.decode("utf-8")
print(buffer)
if writer.transport.is_closing():
# Close the connection
writer.close()
raise LoopBreakException(
"Unexpected termination of VLC. Connection lost. Exiting...")
while not writer.is_closing():
try:
command = input("VLC> ").lower()
await Send_command_Get_output(command)
except LoopBreakException as LBE:
print(LBE)
break
else:
print("Remote host terminated unexpectedly.")
# Run the main function
if __name__ == "__main__":
try:
asyncio.run(connection())
except RuntimeError as RE:
if "Event loop is closed" not in str(RE):
raise RE
我需要有关正确实现
monitor_connection()
函数或最好通过 asyncio 实现相同功能的任何其他方式的帮助。
也欢迎有关其他代码改进的帮助和建议。
这并不像你想象的那么简单......你最好换个角度思考。您必须为此使用协议。不得存在睡眠呼叫!在异步世界中,实际上有点代码味道。这是一个非常简短的示例(尽管不是复制粘贴示例)
from __future__ import annotations
from typing import *
import telnetlib3
class Connection:
def __init__(self, host: str, port: int = 23) -> None:
self._protocol: None | _Protocol = None
self._address = host, port
async def open(self):
reader, writer = await telnetlib3.open_connection(
*self._address, client_factory=self._create_protocol
)
def _create_protocol(self, **kwds):
self._protocol = protocol = _Protocol(self, **kwds)
return protocol
class _Protocol(telnetlib3.TelnetClient):
def __init__(self, connection: Connection, *args, **kwds):
super().__init__(*args, **kwds)
self.connection = connection
def eof_received(self):
# telnetlib3.TelnetClient calls the connection_lost() here and returns None.
# This causes asyncio to close the transport and register the connection_lost() to be called soon,
# Hence we have the connection_lost() that is called twice.
# Contributors know about this because they added logic similar to the "include guard"
# from C to make the connection_lost() not to do things when it's a second call.
pass
def connection_lost(self, exc: None | Exception, /) -> None:
# This is called each time when you connection is lost
...
这样您就可以访问协议内的连接实例。因此,您可以在连接丢失或类似情况时触发回调方法。注意我已经划分了这些实例(连接和协议)。如果您想要一个可重新输入的连接实例(打开、关闭并再次打开同一实例),这非常有用。你可以写
client_factory=lambda **kw: self
并使用同一个类,但是......小心,里面有陷阱:)如果你仍然感兴趣,我可以为你提供下一步要采取的步骤,因为我有一种直觉一旦连接丢失,你想在随机的地方中断你的协程。这不是一个微不足道的挑战...
你知道吗,你确定需要这么复杂的东西吗?可能处理和异常就足够了?当你在阅读时断线时你会得到...
我看到的下一个关键事情——你的
input
电话!这是一个阻塞呼叫。当您等待输入完成时,所有并发都会消失。在这种情况下您必须使用线程。
command = (await asyncio.to_thread(input, "VLC> ")).lower()
如果你真的想要并发,你最好同时等待连接丢失或输入准备好(b 首先完成的是什么)。
# NOTE: input and disconnected are your tasks to wait...
asyncio.wait((input, disconnected), return_when=asyncio.FIRST_COMPLETED)