我正在尝试禁用标准输入缓冲,以便读取 ANSI 代码的响应
\033[6n
(应该报告光标位置)。
我按照答案中的建议尝试了
stdin_ub = os.fdopen(stdin.fileno(), 'rb', buffering=0)
为sys.stdin设置较小的缓冲区大小?,但程序仍然在第一次尝试读取的第ch = stdin_ub.read(1)
行被阻止。当在终端中输入 return 时,它会解除阻塞,这表明标准输入仍然是行缓冲的。
作为参考,这是完整的代码:
def getpos():
stdin_ub = os.fdopen(sys.stdin.fileno(), 'rb', buffering=0)
sys.stdout.write('\033[6n')
sys.stdout.flush()
ch, k, field = None, -1, [b'', b'']
while True:
#print('reading wait...')
ch = stdin_ub.read(1)
#print('reading OK')
if ch == b'[': k = 0
elif ch == b';': k = 1
elif ch == b'R': break
elif k >= 0: field[k] += ch
try:
return tuple(map(int, field))
except:
pass
我使用的是python 3.5.1
技巧是使用
tty.setcbreak(sys.stdin.fileno(), termios.TCSANOW)
并在此之前通过 termios.getattr
将终端属性存储在变量中以恢复默认行为。设置 cbreak
后,sys.stdin.read(1)
不缓冲。这也会抑制来自终端的 ansi 控制代码响应。
def getpos():
buf = ""
stdin = sys.stdin.fileno()
tattr = termios.tcgetattr(stdin)
try:
tty.setcbreak(stdin, termios.TCSANOW)
sys.stdout.write("\x1b[6n")
sys.stdout.flush()
while True:
buf += sys.stdin.read(1)
if buf[-1] == "R":
break
finally:
termios.tcsetattr(stdin, termios.TCSANOW, tattr)
# reading the actual values, but what if a keystroke appears while reading
# from stdin? As dirty work around, getpos() returns if this fails: None
try:
matches = re.match(r"^\x1b\[(\d*);(\d*)R", buf)
groups = matches.groups()
except AttributeError:
return None
return (int(groups[0]), int(groups[1]))
不幸的是,没有便携的方法可以做到这一点。在常见操作系统(例如 Windows 和 Unix 系列)上从键盘读取时,底层 IO 系统是行缓冲的。
curses 模块将提供一种几乎可移植的方式来控制线路规则,不幸的是它不适用于 Windows 系统。
能用就必须用
curses.noecho()
curses.raw() # or curses.cbreak()
进入raw模式(一般echo应该关闭)
和
curses.echo()
curses.noraw() # resp. curses.nocbreak()
恢复正常熟更多
我最近正在研究这个问题,发现这篇文章特别有帮助。需要注意的主要功能是
fcntl.fcntl
和 tty.setcbreak
。
fcntl.fcntl
tty.setcbreak
import fcntl
import os
import sys
import termios
import time
import tty
class raw(object):
def __init__(self, stream):
self.stream = stream
self.fd = self.stream.fileno()
def __enter__(self):
self.original_stty = termios.tcgetattr(self.stream)
tty.setcbreak(self.stream)
def __exit__(self, type, value, traceback):
termios.tcsetattr(self.stream, termios.TCSANOW, self.original_stty)
class nonblocking(object):
def __init__(self, stream):
self.stream = stream
self.fd = self.stream.fileno()
def __enter__(self):
self.orig_fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK)
def __exit__(self, *args):
fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl)
with raw(sys.stdin):
with nonblocking(sys.stdin):
while True:
c = sys.stdin.read(1)
if c:
print(repr(c))
else:
print('not ready')
time.sleep(.1)