如何使用 asyncio 连续监控 telnetlib3 连接以从主机端终止

问题描述 投票:0回答:1

我正在使用 telnetlib3 和 asyncio 模块在 Python 3 中创建一个 telnet 客户端。更具体地说,telnet 客户端是与 VLC Media Player 的 telnet 接口连接的。我已经能够成功实现客户端并建立连接并通过 telnet 发出命令。客户端提示用户输入,然后将其写入主机。

我的问题: 如果在任何情况下远程主机(VLC 窗口)关闭,则客户端将继续等待输入而不是自动终止。我想在解释器等待通过

writer.is_closing()
函数输入时同时监视连接,如果它返回 true,那么我希望它退出或终止。

我的方法:我知道一种方法是在并行线程上运行监视器函数,它可以监视连接并退出脚本,但我也知道 asyncio 也可以执行并发任务,正如官方线程模块文档也建议的那样因此,为了使用asyncio,我想通过使用asyncio来实现监控功能和客户端,并减少库的导入。

我的代码:

import telnetlib3
import asyncio
import json
from helpers import LoopBreakException, send_password

# THIS IS THE FUNCTION THAT I WANT TO BE CONCURRENT
async def monitor_connection():
    while True:
        await asyncio.sleep(3)  # Adjust the interval as needed
        if writer.is_closing():
            raise LoopBreakException(
                "Unexpected termination of VLC. Connection lost. Exiting...")


async def connection():
    global reader, writer
        
    try:
        with open("config.json") as config:
            try:
                configuration = json.loads(config.read())
                # Telnet host, port and password
                host, port, key, token = configuration["host"], configuration[
                    "port"], configuration['key'].encode("utf-8"), configuration['token'].encode("utf-8")
            except json.JSONDecodeError as JDE:
                logger.error(JDE)
                return 0
    except FileNotFoundError as FNFE:
        logger.error(FNFE)
        return 0

    # Connect to the Telnet host
    try:
        reader, writer = await telnetlib3.open_connection(host, port)
    except ConnectionRefusedError as CRE:
        logger.error(CRE)
        return 0
    # Login to interface

    print(await reader.readuntil(b"Password:"))
    writer.write(send_password(key, token)+"\n")
    # I KNOW THIS IS ONE WAY TO CREATE A TASK BUT
    # I AM NOT ABLE TO FIGURE OUT WHERE TO AWAIT IT OR ANY OTHER WAY TO IMPLEMENT THIS
    task = asyncio.create_task(monitor_connection())
    del key
    del token
    await reader.readuntil(b">")

    async def Send_command_Get_output(command):
        if command == "quit":
            writer.write(command + "\n")
            print(await (reader.readuntil()))
            writer.close()
            raise LoopBreakException(
                "Quit command sent to VLC. Closing connection.")
        elif command == "shutdown":
            writer.write(command + "\n")
            try:
                print(await (reader.readuntil(b"Shutting down.")))
            except asyncio.IncompleteReadError as e:
                print("Try failed. Doing partial read")
                print(e.partial)
            finally:
                writer.close()
                raise LoopBreakException("Shutdown completed")
        writer.write(command + "\n")
        try:
            buffer = await reader.readuntil(b">")
        except asyncio.IncompleteReadError as e:
            buffer = e.partial
        buffer = buffer.decode("utf-8")
        print(buffer)
        if writer.transport.is_closing():
            # Close the connection
            writer.close()
            raise LoopBreakException(
                "Unexpected termination of VLC. Connection lost. Exiting...")

    while not writer.is_closing():
        try:
            command = input("VLC> ").lower()
            await Send_command_Get_output(command)
        except LoopBreakException as LBE:
            print(LBE)
            break
    else:
        print("Remote host terminated unexpectedly.")

# Run the main function
if __name__ == "__main__":
    try:
        asyncio.run(connection())
    except RuntimeError as RE:
        if "Event loop is closed" not in str(RE):
            raise RE

我需要有关正确实现

monitor_connection()
函数或最好通过 asyncio 实现相同功能的任何其他方式的帮助。

也欢迎有关其他代码改进的帮助和建议。

python multithreading asynchronous networking python-multithreading
1个回答
0
投票

这并不像你想象的那么简单......你最好换个角度思考。您必须为此使用协议。不得存在睡眠呼叫!在异步世界中,实际上有点代码味道。这是一个非常简短的示例(尽管不是复制粘贴示例)

from __future__ import annotations

from typing import *

import telnetlib3


class Connection:
    def __init__(self, host: str, port: int = 23) -> None:
        self._protocol: None | _Protocol = None
        self._address = host, port

    async def open(self):
        reader, writer = await telnetlib3.open_connection(
            *self._address, client_factory=self._create_protocol
        )

    def _create_protocol(self, **kwds):
        self._protocol = protocol = _Protocol(self, **kwds)
        return protocol


class _Protocol(telnetlib3.TelnetClient):
    def __init__(self, connection: Connection, *args, **kwds):
        super().__init__(*args, **kwds)
        self.connection = connection

    def eof_received(self):
        # telnetlib3.TelnetClient calls the connection_lost() here and returns None.
        # This causes asyncio to close the transport and register the connection_lost() to be called soon,
        # Hence we have the connection_lost() that is called twice.
        # Contributors know about this because they added logic similar to the "include guard"
        # from C to make the connection_lost() not to do things when it's a second call.
        pass

    def connection_lost(self, exc: None | Exception, /) -> None:
        # This is called each time when you connection is lost
        ...

这样您就可以访问协议内的连接实例。因此,您可以在连接丢失或类似情况时触发回调方法。注意我已经划分了这些实例(连接和协议)。如果您想要一个可重新输入的连接实例(打开、关闭并再次打开同一实例),这非常有用。你可以写

client_factory=lambda **kw: self
并使用同一个类,但是......小心,里面有陷阱:)如果你仍然感兴趣,我可以为你提供下一步要采取的步骤,因为我有一种直觉一旦连接丢失,你想在随机的地方中断你的协程。这不是一个微不足道的挑战...

你知道吗,你确定需要这么复杂的东西吗?可能处理和异常就足够了?当你在阅读时断线时你会得到...

我看到的下一个关键事情——你的

input
电话!这是一个阻塞呼叫。当您等待输入完成时,所有并发都会消失。在这种情况下您必须使用线程。

command = (await asyncio.to_thread(input, "VLC> ")).lower()

如果你真的想要并发,你最好同时等待连接丢失或输入准备好(b 首先完成的是什么)。

# NOTE: input and disconnected are your tasks to wait...
asyncio.wait((input, disconnected), return_when=asyncio.FIRST_COMPLETED)
© www.soinside.com 2019 - 2024. All rights reserved.