我想编写一个简单的 telnet 服务器,它可以处理用户键入而无需他们按回车键。这可以在 Python 中使用
telnetlib3
库和以下代码完成:
import asyncio
import telnetlib3
from telnetlib3 import TelnetReaderUnicode, TelnetWriterUnicode
async def shell(reader: TelnetReaderUnicode, writer: TelnetWriterUnicode):
# Handle input function
async def on_input(inp: str):
# Switch case
match inp:
case '\x1b[C' | 'd': # Right
writer.write('\x1b[C')
case '\x1b[D' | 'a': # Left
writer.write('\x1b[D')
case '\x1b' | 'q' | '\x03': # Escape or q or Ctrl+C
writer.write('\r\nBye!\r\n')
writer.close()
return True
case _:
print(f'Unknown input: {repr(inp)}')
# Listen input
while True:
inp: str = await reader.read(3)
if inp and await on_input(inp):
return
if __name__ == '__main__':
# Create a new event loop, start the server and wait for it to close
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
coro = telnetlib3.create_server(port=2323, shell=shell)
server = loop.run_until_complete(coro)
loop.run_until_complete(server.wait_closed())
在 Rust 中,我使用带有读写通道的 Tokio 创建了一个类似的 TCP 服务器,但是输入只会在 telnet 客户端按下 enter 时显示在服务器上,无论缓冲区大小有多小(例如,我将其设置为 3在这种情况下)。有没有办法直接使用无缓冲 IO 来收集客户端按键而不换行?
use std::io::Result;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
};
#[tokio::main]
async fn main() -> Result<()> {
const ADDR: &str = "127.0.0.1:2323";
let listener = TcpListener::bind(ADDR).await?;
println!("Server running on {ADDR}");
while let Ok((mut socket, _addr)) = listener.accept().await {
// Spawn a task to handle the socket
tokio::spawn(async move {
let (mut reader, mut writer) = socket.split();
// Put the terminal into raw mode to receive escape sequences
writer.write_all(b"\x1B[?25l").await.unwrap(); // Hide the cursor
writer.write_all(b"\x1B[?1049h").await.unwrap(); // Save cursor position and switch to alternate screen
writer.flush().await.unwrap();
// Send a welcome message to the client
writer.write_all(b"Meow~\n").await.unwrap();
// Read from the client until the connection is closed
let mut buf = [0; 3];
loop {
// Print escaped characters for debug
let n = reader.read(&mut buf).await.unwrap();
for i in 0..n {
for e in std::ascii::escape_default(buf[i]) {
print!("{}", e as char);
}
}
writer
.write_all(
format!(
"\nReceived {n} bytes: {}",
String::from_utf8_lossy(&buf[..n])
)
.as_ref(),
)
.await
.unwrap();
if n == 0 {
break;
}
}
});
}
Ok(())
}