我尝试修改here的示例并使进程并行运行,因为在我的用例中,进程在其生命周期的大部分时间里都可以处于空闲状态,因此并行运行时可以更好地利用CPU资源
a.out(17512,0x1dac25c40) malloc: *** error for object 0x600003d60000: pointer being freed was not allocated
a.out(17512,0x1dac25c40) malloc: *** set a breakpoint in malloc_error_break to debug
libc++abi: terminating due to uncaught exception of type std::__1::future_error: The associated promise has been destructed prior to the associated state becoming ready.```
boost::asio::post(ioc, [&]() {
从 io_context 运行所有内容,但它也进展不顺利,因为我陷入了死锁,其中 std::future 阻塞了线程,并阻止了进程运行。我想可以添加更多线程来运行 io_context,但我更喜欢有一些可以产生结果的 std::future。我很高兴听到修复我的代码的建议:
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <boost/thread.hpp>
using duration = std::chrono::system_clock::duration;
namespace asio = boost::asio;
using namespace std::chrono_literals;
std::string ExecuteProcess(boost::filesystem::path exe,
std::vector<std::string> args, //
duration time, //
std::error_code& ec, //
asio::io_context& ioc) {
namespace bp = boost::process;
std::future<std::string> data, err_output;
auto const deadline = std::chrono::steady_clock::now() + time;
bp::group g;
ec.clear();
bp::child child(exe, args, ioc, g, bp::error(ec), bp::std_in.null(), bp::std_out > data,
bp::std_err > err_output);
if (ec) {
return {};
}
if (data.wait_until(deadline) == std::future_status::ready) {
return data.get();
}
if (std::error_code ignore; child.running(ignore)) {
g.terminate(ignore);
}
ec = make_error_code(asio::error::timed_out); // TODO FIXME
return {};
}
int main() {
constexpr duration timeout = 20s;
[[maybe_unused]] constexpr auto script1 = "/usr/bin/curl http://httpbin.org/ip -m 5";
[[maybe_unused]] constexpr auto script2 = R"(delay="0.5"; sleep "$delay"; echo -n "sanity restored after $delay")";
asio::io_context ioc;
auto work = make_work_guard(ioc); // prevent running out of work
std::thread io_thread([&ioc] { ioc.run(); });
// Option 1 : use thread group
// boost::thread_group worker_threads;
// for (int i = 0; i < 20; i++) {
// worker_threads.create_thread([&]() {
// std::error_code ec;
// auto s = ExecuteProcess("/bin/bash", {"-c", script2}, timeout, ec, ioc);
// std::cout << "got " << ec.message() << ": " << s << std::endl;
// });
// }
//
// work.reset(); // allow running out of work
// io_thread.join();
// worker_threads.join_all();
// Option 2 : use post-io_context
for (int i = 0; i < 20; i++) {
boost::asio::post(ioc, [&]() {
std::error_code ec;
auto s = ExecuteProcess("/bin/bash", {"-c", script2}, timeout, ec, ioc);
std::cout << "got " << ec.message() << ": " << s << std::endl;
});
}
work.reset(); // allow running out of work
io_thread.join();
}
可以使用以下命令编译代码进行测试:
g++ -std=c++20 -g -O3 -Wall -pedantic -pthread -lboost_{thread,coroutine,context} ~/main.cpp -I<path_to_boost_headers> -L<path_to_boost_libs>
是的。有点烦人的是
boost::process::child
没有公开与完成令牌兼容的常规异步接口,尽管它与 Asio 紧密集成并自然地处理 100% 异步 processes。
此接口确实存在:https://beta.boost.org/doc/libs/1_82_0/doc/html/boost/process/async_system.html。但是,它无法与仅限 c++14 的完成标记一起使用。它还具有非常古老的反模式,即传递对
asio::io_context
的引用。
这使得很难智能地使用它,例如用一根线,甚至用
asio::thread_pool
。然而,我们可以推出自己的启动函数来解决这些缺点。遗憾的是,我们始终需要一个 io_context
实例,尽管您可能选择使用隐藏在界面中的内部(“全局”)实例。
使用一些方便的类型定义
using boost::filesystem::path;
using Signature = void(std::error_code, int, std::string);
using Duration = std::chrono::system_clock::duration;
using Args = std::vector<std::string>;
让我们将完成令牌接口定义为
template <typename Token>
auto asyncExecuteProcess( //
path exe, Args args, Duration limit, asio::io_context& ioc, //
Token&& token);
在深入研究之前,我们希望使用它在一定的时间限制下并行运行多个进程,并在每个结果准备好后立即处理它:
int main() {
using ProcessExecution::asyncExecuteProcess;
constexpr auto timeout = 50ms;
constexpr auto script = R"(
number=$((1 + $RANDOM % 9));
sleep "0.0$number";
echo -n "sanity restored after 0.0$number";
exit $number
)";
asio::io_context ioc(1);
auto work = make_work_guard(ioc);
for (int i = 0; i < 10; i++)
asyncExecuteProcess( //
"/bin/bash", {"-c", script}, timeout, ioc, //
[i](std::error_code ec, int exit_code, std::string data) {
if (ec)
std::cout << "Completed #" << i << " " << ec.message() << " " << exit_code << " "
<< quoted(data) << std::endl;
});
std::cout << "Everything initiated, single-threaded multiplexed completion" << std::endl;
work.reset();
ioc.run();
}
嵌入的 bash 脚本在 1..10 * 10ms 之间随机休眠,并返回相应的退出代码 1..10。由于我们“立即”启动它们,因此我们期望结果按延迟的升序排列,但不一定按作业编号的顺序(#i)。
由于所有超过 50 毫秒的延迟都会超时,我们预计最后几个条目会显示错误。
在实现细节中,我们需要推导出关联的执行器(如果完成令牌是例如
bind_executor(my_strand, asio::use_awaitable)
)。
auto ex = asio::get_associated_executor(token, ioc.get_executor());
接下来,需要分配一些资源,以便它们在异步操作期间保持稳定:
auto a = asio::get_associated_allocator(token);
auto st = std::allocate_shared<stable_state>(a, ex, deadline);
稳定状态包含所有不能(廉价)移动的东西:
struct stable_state : std::enable_shared_from_this<stable_state> {
stable_state(asio::any_io_executor ex, time_point t) : timer(ex, t) {}
asio::steady_timer timer;
bp::group process_group;
std::future<std::string> out, err;
void arm_timer() {
timer.async_wait([self = this /*->shared_from_this()*/](boost::system::error_code ec) {
if (!ec)
self->process_group.terminate(ec);
});
}
};
启动函数的其余部分可能如下所示:
auto init = [&ioc, ex, st] //
(auto&& handler, path exe, Args args) {
auto wrapped_handler = [h = std::move(handler), ex, st] //
(int exit_code, std::error_code ec) mutable {
bool terminated = !st->timer.expires_at(time_point::min());
bool ok = !terminated && !ec;
std::string data;
if (ok) {
if (st->out.wait_for(0s) == std::future_status::ready)
data = st->out.get();
else
ec = make_error_code(asio::error::interrupted); // TODO
}
if (terminated) {
data = "Killed";
if (!ec)
ec = make_error_code(asio::error::operation_aborted);
}
assert(st.unique());
st.reset(); // deallocate before invoking the handler
asio::dispatch( //
ex, [=, h = std::move(h), d = std::move(data)]() mutable { //
std::move(h)(ec, exit_code, std::move(d));
});
};
bp::child(ioc, exe, args, //
st->process_group, //
bp::ignore_error, //
bp::std_in.null(), //
bp::std_out > st->out, //
bp::std_err > st->err, //
bp::on_exit(std::move(wrapped_handler)))
.detach();
st->arm_timer();
};
return asio::async_initiate<Token, Signature>( //
init, token, std::move(exe), std::move(args));
注意我选择通过检测计时器是否已经完成来间接“软检测”终止。在 POSIX 上,更好的方法是将
保持在稳定状态并查询bp::child
,如下所述:https://stackoverflow.com/a/57733210/85371native_exit_code
否则,大多数代码都是确保使用正确的执行器,在需要的时刻释放分配,并执行各种断言以防止编程错误。
当然,如果没有布丁的证明,我们会在哪里:
Live On Coliru(Coliru 速度太慢,输出无法说明)
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <iostream>
using namespace std::chrono_literals;
namespace asio = boost::asio;
namespace ProcessExecution {
using boost::filesystem::path;
using Signature = void(std::error_code, int, std::string);
using Duration = std::chrono::system_clock::duration;
using Args = std::vector<std::string>;
template <typename Token>
auto asyncExecuteProcess( //
path exe, Args args, Duration limit, asio::io_context& ioc, //
Token&& token) //
{
namespace bp = boost::process;
using time_point = std::chrono::steady_clock::time_point;
time_point deadline = std::chrono::steady_clock::now() + limit;
struct stable_state : std::enable_shared_from_this<stable_state> {
stable_state(asio::any_io_executor ex, time_point t) : timer(ex, t) {}
asio::steady_timer timer;
bp::group process_group;
std::future<std::string> out, err;
void arm_timer() {
timer.async_wait([self = this /*->shared_from_this()*/](boost::system::error_code ec) {
if (!ec)
self->process_group.terminate(ec);
});
}
};
auto ex = asio::get_associated_executor(token, ioc.get_executor());
auto a = asio::get_associated_allocator(token);
auto st = std::allocate_shared<stable_state>(a, ex, deadline);
auto init = [&ioc, ex, st] //
(auto&& handler, path exe, Args args) {
auto wrapped_handler = [h = std::move(handler), ex, st] //
(int exit_code, std::error_code ec) mutable {
bool terminated = !st->timer.expires_at(time_point::min());
bool ok = !terminated && !ec;
std::string data;
if (ok) {
if (st->out.wait_for(0s) == std::future_status::ready)
data = st->out.get();
else
ec = make_error_code(asio::error::interrupted); // TODO
}
if (terminated) {
data = "Killed";
if (!ec)
ec = make_error_code(asio::error::operation_aborted);
}
assert(st.unique());
st.reset(); // deallocate before invoking the handler
asio::dispatch( //
ex, [=, h = std::move(h), d = std::move(data)]() mutable { //
std::move(h)(ec, exit_code, std::move(d));
});
};
bp::child(ioc, exe, args, //
st->process_group, //
bp::ignore_error, //
bp::std_in.null(), //
bp::std_out > st->out, //
bp::std_err > st->err, //
bp::on_exit(std::move(wrapped_handler)))
.detach();
st->arm_timer();
};
return asio::async_initiate<Token, Signature>( //
init, token, std::move(exe), std::move(args));
}
} // namespace ProcessExecution
int main() {
using ProcessExecution::asyncExecuteProcess;
constexpr auto timeout = 50ms;
constexpr auto script = R"(
number=$((1 + $RANDOM % 9));
sleep "0.0$number";
echo -n "sanity restored after 0.0$number";
exit $number
)";
asio::io_context ioc(1);
auto work = make_work_guard(ioc);
for (int i = 0; i < 10; i++)
asyncExecuteProcess( //
"/bin/bash", {"-c", script}, timeout, ioc, //
[i](std::error_code ec, int exit_code, std::string data) {
std::cout << "Completed #" << i << " " << ec.message() << " " << exit_code << " "
<< quoted(data) << std::endl;
});
std::cout << "Everything initiated, single-threaded multiplexed completion" << std::endl;
work.reset();
ioc.run();
}
¹ 这里仍然存在着一场微妙的竞赛,即使
terminated==false
我们仍然必须检查 out
的未来是 ready
。如果我们不这样做,我们可能会非常不幸,并且会发生一场未来尚未准备好但进程已被终止的竞赛。我怀疑这表明 Boost Process 实际上并未在服务线程上执行所有工作(根据需要)。我可能稍后会研究这个。