我希望能够以编程方式运行文件,随时任意为其提供输入(标准输入),并定期轮询任何标准输出。我还希望能够随时终止该进程。
这是我尝试过的:
import subprocess
from threading import Thread
class Runner:
def __init__(self):
self.process = subprocess.Popen(
["python", "x.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
def run(self):
self.process.wait()
def poll(self):
print("got stdout:", self.process.stdout.readline().decode(), end="")
def give_input(self, text=""):
return self.process.stdin.write(bytes(text))
def kill(self):
self.process.kill()
r = Runner()
t = Thread(target=r.run)
t.start()
r.poll() # should be "hi"
r.poll() # should be "your name:"
r.give_input("hi\n")
r.kill()
t.join()
这是
x.py
中的代码
print("hi")
input("Your name: ")
因此,在我的演示中,我启动了一个线程,该线程将调用
run
来运行该进程。然后,我为标准输出 poll
两次。应该打印出来
got stdout: hi
got stdout: Your name:
然后,我给该进程一个标准输入——
hi\n
。
此时,程序应该终止,我通过执行
r.kill()
来确保这一点。
但是,程序并没有按预期运行。
相反,程序在第二个
r.poll()
处冻结,我不知道为什么。我怀疑由于没有换行符,程序将继续读取,但我不知道如何防止它这样做。
有什么想法吗?
subprocess.PIPE
上的非阻塞读取:
import subprocess
from queue import Queue, Empty
from threading import Thread
from typing import IO
import io
class Runner:
def __init__(self, stdin_input: str):
self.process = subprocess.Popen(
"py x.py",
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=2,
close_fds=False,
)
self.process.stdin.write(stdin_input.encode() + b"\n")
def reader(self, out: IO[str], queue: Queue[bytes]):
def enqueue_output(out: IO[str], queue: Queue[bytes]):
stream = io.open(out.fileno(), "rb", closefd=False)
while True:
n = stream.read1()
if len(n) > 0:
queue.put(n)
else:
return
t = Thread(target=enqueue_output, args=(out, queue))
# end thread when program ends
t.daemon = True
t.start()
def run(self, timeout=1):
stdout_queue: Queue[bytes] = Queue()
stderr_queue: Queue[bytes] = Queue()
self.reader(self.process.stdout, stdout_queue)
self.reader(self.process.stderr, stderr_queue)
try:
self.process.communicate(timeout=timeout)
except:
print("ERROR: TIMED OUT")
finally:
print("=== STDOUT ===")
try:
while True:
print(stdout_queue.get_nowait().decode(), end="")
except Empty:
print("=== STDOUT ===\n\n")
print("=== STDERR ===")
try:
while True:
print(stderr_queue.get_nowait().decode(), end="")
except Empty:
print("=== STDERR ===")
r = Runner("5\n6")
r.run()
不幸的是,我无法弄清楚如何以交互方式为流程提供
stdin
,也无法定期轮询stdout
,但我能够为我的用例找到一个不错的解决方案。我没有经常获得输出,而是只是在终止程序之前运行几秒钟。
希望这对某人有帮助!