如何从服务器按需发送数据到客户端

问题描述 投票:0回答:1

我已经使用 asyncio 实现了一个 TCP 服务器,它定期从客户端接收数据,假设每 30 秒一次。现在,我的目标是按需向客户端发送数据。为了实现这个目标,我打算建立一种机制,可以在客户端设置特定的参数。因此,我需要为此目的将消息从服务器传输到客户端。

我尝试过实现这个,但它不起作用

这是我的代码

import asyncio
from protocol import Message  # Import your Message class from protocol.py

class TCPServer:

    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.server = None
        self.clients = {}  # Dictionary to store connected clients
        self.input_queue = asyncio.Queue()  # Queue for console input

    async def handle_client(self, reader, writer):
       message = Message()  # Create an instance of your Message class
       addr = writer.get_extra_info('peername')

       # Convert the client address to a string for consistency
       client_address = f"{addr[0]}:{addr[1]}"

       try:
           self.clients[client_address] = writer  # Store the writer for this client

           while True:
               data = await reader.read(1024)
               if not data:
                   break

               # Assuming your Message class has a `raw_data` method
               response = message.raw_data(data)

               print(f"Received {data!r} from {addr!r}")

               print(f"Send: {response!r}")
               writer.write(response.encode())
               await writer.drain()

    except ConnectionResetError as e:
        print(f"ConnectionResetError: {e}")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        print(f"Client {addr!r} closed the connection")
        writer.close()
        await writer.wait_closed()
        del self.clients[client_address]  # Remove the client from the list when they 
       disconnect

    async def send_to_client(self, client_address, message):
          if client_address in self.clients:
             writer = self.clients[client_address]
             writer.write(message.encode())
             await writer.drain()
         else:
             print("Client not found or not connected.")

   async def start(self):
        self.server = await asyncio.start_server(
        self.handle_client, self.host, self.port)

        addr = self.server.sockets[0].getsockname()
        print(f'Serving on {addr}')

        async with self.server:
            await self.server.serve_forever()

  async def stop(self):
        if self.server:
           self.server.close()
           await self.server.wait_closed()
           print("Server stopped")

  async def console_input(self):
       while True:
           client_address = await self.input_queue.get()
           hex_data = input("Enter the hex data to send: ")
           try:
               byte_data = bytes.fromhex(hex_data)
               await self.send_to_client(client_address, byte_data)
          except ValueError:
               print("Invalid hex data format.")

 if __name__ == '__main__':
     host = '127.0.0.1'  # Change this to the desired host
     port = 8888  # Change this to the desired port

     server = TCPServer(host, port)

    async def main():
        try:
          server_task = asyncio.create_task(server.start())
          console_task = asyncio.create_task(server.console_input())

          # Add client address to the input queue for testing
          await asyncio.sleep(2)  # Wait for the server to start (adjust as needed)
          client_address = '127.0.0.1:8888'
          server.input_queue.put_nowait(client_address)

          await asyncio.gather(server_task, console_task)
      except KeyboardInterrupt:
          print("KeyboardInterrupt: Server shutting down...")
          await server.stop()

  asyncio.run(main())
python python-asyncio tcpserver
1个回答
0
投票

看起来您想在Python中使用asyncio实现一个TCP服务器,它既可以定期从客户端接收数据,也可以按需向客户端发送数据。但是,您提到发送部分不起作用。为了帮助您解决此问题,我将为您的代码提供一些指导和潜在的修复方法:

确保正确的消息处理:

在 send_to_client 方法中,您使用 writer.write(message.encode()) 将数据发送到客户端。但是,您需要考虑客户端期望如何接收和处理这些数据。确保客户端知道如何解析和处理服务器发送的消息。 正确处理队列任务:

在主函数中,您正在为 server.start() 和 server.console_input() 创建任务。但是,您应该使用await asyncio.gather(...) 等待这些任务,以确保它们同时运行。 这是经过这些更改的代码的修改版本:

import asyncio
from protocol import Message  # Import your Message class from protocol.py

class TCPServer:

    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.server = None
        self.clients = {}  # Dictionary to store connected clients
        self.input_queue = asyncio.Queue()  # Queue for console input

    async def handle_client(self, reader, writer):
        message = Message()  # Create an instance of your Message class
        addr = writer.get_extra_info('peername')

        # Convert the client address to a string for consistency
        client_address = f"{addr[0]}:{addr[1]}"

        try:
            self.clients[client_address] = writer  # Store the writer for this client

            while True:
                data = await reader.read(1024)
                if not data:
                    break

                # Assuming your Message class has a `raw_data` method
                response = message.raw_data(data)

                print(f"Received {data!r} from {addr!r}")

                print(f"Send: {response!r}")
                writer.write(response.encode())
                await writer.drain()

        except ConnectionResetError as e:
            print(f"ConnectionResetError: {e}")
        except Exception as e:
            print(f"Error: {e}")
        finally:
            print(f"Client {addr!r} closed the connection")
            writer.close()
            await writer.wait_closed()
            del self.clients[client_address]  # Remove the client from the list when they disconnect

    async def send_to_client(self, client_address, message):
        if client_address in self.clients:
            writer = self.clients[client_address]
            writer.write(message.encode())
            await writer.drain()
        else:
            print("Client not found or not connected.")

    async def start(self):
        self.server = await asyncio.start_server(
            self.handle_client, self.host, self.port)

        addr = self.server.sockets[0].getsockname()
        print(f'Serving on {addr}')

        async with self.server:
            await self.server.serve_forever()

    async def stop(self):
        if self.server:
            self.server.close()
            await self.server.wait_closed()
            print("Server stopped")

    async def console_input(self):
        while True:
            client_address = await self.input_queue.get()
            hex_data = input("Enter the hex data to send: ")
            try:
                byte_data = bytes.fromhex(hex_data)
                await self.send_to_client(client_address, byte_data)
            except ValueError:
                print("Invalid hex data format.")

if __name__ == '__main__':
    host = '127.0.0.1'  # Change this to the desired host
    port = 8888  # Change this to the desired port

    server = TCPServer(host, port)

    async def main():
        try:
            server_task = asyncio.create_task(server.start())
            console_task = asyncio.create_task(server.console_input())

            # Add client address to the input queue for testing
            await asyncio.sleep(2)  # Wait for the server to start (adjust as needed)
            client_address = '127.0.0.1:8888'
            server.input_queue.put_nowait(client_address)

            await asyncio.gather(server_task, console_task)
        except KeyboardInterrupt:
            print("KeyboardInterrupt: Server shutting down...")
            await server.stop()

    asyncio.run(main())

请注意,假设您的消息已经是字节格式,我在向客户端发送消息时删除了 .encode() 方法。确保客户端也期望相同格式的数据。此外,请确保您的 protocol.py 文件已正确实现以处理服务器发送的消息。

© www.soinside.com 2019 - 2024. All rights reserved.