我正在Rust中创建一个小的ncurses应用程序,需要与子进程通信。我已经有了一个用Common Lisp编写的原型。我正在尝试重写它,因为CL为这么小的工具使用了大量的内存。
我在弄清楚如何与子流程进行交互时遇到了一些麻烦。
我目前正在做的大致是这样的:
let mut program = match Command::new(command)
.args(arguments)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(_) => {
println!("Cannot run program '{}'.", command);
return;
}
};
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
match program.stdout {
Some(ref mut out) => {
let mut buf_string = String::new();
match out.read_to_string(&mut buf_string) {
Ok(_) => output_viewer.append_string(buf_string),
Err(_) => return,
};
}
None => return,
};
}
然而,对read_to_string
的调用会阻止程序直到进程退出。从我所看到的read_to_end
和read
似乎也阻止了。如果我尝试运行像ls
这样的东西,它会立即退出,它可以工作,但有些东西不会像python
或sbcl
一样退出,只有在我手动杀死子进程后它才会继续。
基于this answer,我更改了代码以使用BufReader
:
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
match program.stdout.as_mut() {
Some(out) => {
let buf_reader = BufReader::new(out);
for line in buf_reader.lines() {
match line {
Ok(l) => {
output_viewer.append_string(l);
}
Err(_) => return,
};
}
}
None => return,
}
}
但问题仍然存在。它将读取所有可用的行,然后阻止。由于该工具应该可以与任何程序一起使用,因此在尝试读取之前无法猜出输出何时结束。似乎没有办法为BufReader
设置超时。
默认情况下,流是阻止的。 TCP / IP流,文件系统流,管道流,它们都是阻塞的。当你告诉一个流给你一大块字节时,它会停止并等到它有给定的字节数或者直到发生其他事情(interrupt,流的结尾,错误)。
操作系统急于将数据返回到读取过程,所以如果你想要的只是等待下一行并在它进入时立即处理它,那么Shepmaster在Unable to pipe to or from spawned child process more than once中建议的方法就可以了。 (理论上它没有必要,因为允许操作系统使BufReader
等待read
中的更多数据,但实际上操作系统更喜欢早期的“短读取”等待)。
当您需要处理多个流(如子进程的BufReader
和stdout
)或多个进程时,这种简单的基于stderr
的方法会停止工作。例如,基于BufReader
的方法可能会在子进程等待您排空其stderr
管道时遇到死锁,而您的进程被阻塞等待它的空stdout
。
同样,当您不希望程序无限期地等待子进程时,您不能使用BufReader
。也许您想要在孩子仍在工作时显示进度条或计时器并且没有输出。
如果您的操作系统不急于将数据返回到进程(更喜欢“完全读取”到“短读取”),则不能使用基于BufReader
的方法,因为在这种情况下,子进程打印的最后几行可能最终处于灰色区域:操作系统得到它们,但它们不够大,无法填充BufReader
的缓冲区。
BufReader
仅限于Read
接口允许它与流进行的操作,它的阻塞程度不低于底层流。为了提高效率,它将以块的形式输入read,告诉操作系统尽可能多地填充其缓冲区。
您可能想知道为什么在块中读取数据如此重要,为什么BufReader
不能逐字节地读取数据。问题是要从流中读取数据,我们需要操作系统的帮助。另一方面,我们不是操作系统,我们与它隔离工作,以免在我们的流程出现问题时弄乱它。因此,为了调用操作系统,需要转换到“内核模式”,这也可能导致“上下文切换”。这就是调用操作系统读取每个字节的原因很昂贵。我们希望尽可能少的OS调用,因此我们批量获取流数据。
要在没有阻止的情况下等待流,您需要一个非阻塞流。 MIO promises to have the required non-blocking stream support for pipes,很可能与PipeReader,但到目前为止我还没有检查过。
流的非阻塞性质应该能够以块的形式读取数据,而不管操作系统是否更喜欢“短读取”。因为非阻塞流永远不会阻塞。如果流中没有数据,它只会告诉您。
在没有阻塞流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,因此不会阻止您的主线程。您可能还希望逐字节读取流,以便在操作系统不喜欢“短读取”时立即对行分隔符做出反应。这是一个有效的例子:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78。
以下是使用tokio和tokio-process的示例。
use std::{
io::BufReader,
process::{Command, Stdio},
};
use tokio::{io, prelude::*, runtime::Runtime}; // 0.1.18
use tokio_process::CommandExt; // 0.2.3
fn main() {
let mut cmd = Command::new("/tmp/slow.bash")
.stdout(Stdio::piped())
.spawn_async()
.expect("cannot spawn");
let stdout = cmd.stdout().take().expect("no stdout");
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on({
io::lines(BufReader::new(stdout))
.inspect(|s| println!("> {}", s))
.collect()
});
println!("All the lines: {:?}", result);
}
以下是使用tokio和tokio-threadpool的示例。我们使用blocking
函数在一个线程中启动该过程。我们用stream::poll_fn
将其转换为流
use std::process::{Command, Stdio};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13
fn stream_command_output(
mut command: Command,
) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
// Ensure that the output is available to read from and start the process
let mut child = command
.stdout(Stdio::piped())
.spawn()
.expect("cannot spawn");
let mut stdout = child.stdout.take().expect("no stdout");
// Create a stream of data
stream::poll_fn(move || {
// Perform blocking IO
tokio_threadpool::blocking(|| {
// Allocate some space to store anything read
let mut data = vec![0; 128];
// Read 1-128 bytes of data
let n_bytes_read = stdout.read(&mut data).expect("cannot read");
if n_bytes_read == 0 {
// Stdout is done
None
} else {
// Only return as many bytes as we read
data.truncate(n_bytes_read);
Some(data)
}
})
})
}
fn main() {
let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on({
output_stream
.map(|d| String::from_utf8(d).expect("Not UTF-8"))
.fold(Vec::new(), |mut v, s| {
print!("> {}", s);
v.push(s);
Ok(v)
})
});
println!("All the lines: {:?}", result);
}
可以在这里做出许多可能的权衡。例如,总是分配128个字节并不理想,但实现起来很简单。
作为参考,这里是slow.bash:
#!/usr/bin/env bash
set -eu
val=0
while [[ $val -lt 10 ]]; do
echo $val
val=$(($val + 1))
sleep 1
done
也可以看看: