使用单个io_context并行运行多个超时进程

问题描述 投票:0回答:1

我尝试修改here的示例并使进程并行运行,因为在我的用例中,进程在其生命周期的大部分时间里都可以处于空闲状态,因此并行运行时可以更好地利用CPU资源

  1. 首先我尝试让每个任务都从 boost::thread_group 对象运行。 在运行它时,我有时会遇到意外的行为,导致崩溃,原因如下:
a.out(17512,0x1dac25c40) malloc: *** error for object 0x600003d60000: pointer being freed was not allocated
a.out(17512,0x1dac25c40) malloc: *** set a breakpoint in malloc_error_break to debug
libc++abi: terminating due to uncaught exception of type std::__1::future_error: The associated promise has been destructed prior to the associated state becoming ready.```
  1. 然后我尝试替换 thread_group 并使用
    boost::asio::post(ioc, [&]() {
    从 io_context 运行所有内容,但它也进展不顺利,因为我陷入了死锁,其中 std::future 阻塞了线程,并阻止了进程运行。我想可以添加更多线程来运行 io_context,但我更喜欢有一些可以产生结果的 std::future。

我很高兴听到修复我的代码的建议:

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <boost/thread.hpp>


using duration = std::chrono::system_clock::duration;
namespace asio = boost::asio;
using namespace std::chrono_literals;

std::string ExecuteProcess(boost::filesystem::path  exe,
                           std::vector<std::string> args, //
                           duration                 time, //
                           std::error_code&         ec,   //
                           asio::io_context&        ioc) {
    namespace bp = boost::process;
    std::future<std::string> data, err_output;

    auto const deadline = std::chrono::steady_clock::now() + time;

    bp::group  g;
    ec.clear();
    bp::child child(exe, args, ioc, g, bp::error(ec), bp::std_in.null(), bp::std_out > data,
                    bp::std_err > err_output);

    if (ec) {
        return {};
    }

    if (data.wait_until(deadline) == std::future_status::ready) {
        return data.get();
    }

    if (std::error_code ignore; child.running(ignore)) {
        g.terminate(ignore);
    }

    ec = make_error_code(asio::error::timed_out); // TODO FIXME
    return {};
}

int main() {
    constexpr duration              timeout = 20s;
    [[maybe_unused]] constexpr auto script1 = "/usr/bin/curl http://httpbin.org/ip -m 5";
    [[maybe_unused]] constexpr auto script2 = R"(delay="0.5"; sleep "$delay"; echo -n "sanity restored after $delay")";

    asio::io_context ioc;

    auto        work = make_work_guard(ioc); // prevent running out of work
    std::thread io_thread([&ioc] { ioc.run(); });


// Option 1 : use thread group

//    boost::thread_group worker_threads;
//    for (int i = 0; i < 20; i++) {
//      worker_threads.create_thread([&]() {
//          std::error_code ec;
//          auto s = ExecuteProcess("/bin/bash", {"-c", script2}, timeout, ec, ioc);
//          std::cout << "got " << ec.message() << ": " << s << std::endl;
//      });
//    }
//
//    work.reset(); // allow running out of work
//    io_thread.join();
//    worker_threads.join_all();



// Option 2 : use post-io_context
    for (int i = 0; i < 20; i++) {
    boost::asio::post(ioc, [&]() {
            std::error_code ec;
            auto s = ExecuteProcess("/bin/bash", {"-c", script2}, timeout, ec, ioc);
            std::cout << "got " << ec.message() << ": " << s << std::endl;
      });
    }
    work.reset(); // allow running out of work
    io_thread.join();

}

可以使用以下命令编译代码进行测试:

g++ -std=c++20 -g -O3 -Wall -pedantic -pthread -lboost_{thread,coroutine,context} ~/main.cpp -I<path_to_boost_headers> -L<path_to_boost_libs>
c++ boost boost-asio boost-process
1个回答
0
投票

是的。有点烦人的是

boost::process::child
没有公开与完成令牌兼容的常规异步接口,尽管它与 Asio 紧密集成并自然地处理 100% 异步 processes

此接口确实存在:https://beta.boost.org/doc/libs/1_82_0/doc/html/boost/process/async_system.html。但是,它无法与仅限 c++14 的完成标记一起使用。它还具有非常古老的反模式,即传递对

asio::io_context
的引用。

这使得很难智能地使用它,例如用一根线,甚至用

asio::thread_pool
。然而,我们可以推出自己的启动函数来解决这些缺点。遗憾的是,我们始终需要一个
io_context
实例,尽管您可能选择使用隐藏在界面中的内部(“全局”)实例。

使用一些方便的类型定义

using boost::filesystem::path;
using Signature = void(std::error_code, int, std::string);
using Duration  = std::chrono::system_clock::duration;
using Args      = std::vector<std::string>;

让我们将完成令牌接口定义为

template <typename Token>
auto asyncExecuteProcess(                                       //
    path exe, Args args, Duration limit, asio::io_context& ioc, //
    Token&& token);

在深入研究之前,我们希望使用它在一定的时间限制下并行运行多个进程,并在每个结果准备好后立即处理它:

int main() {
    using ProcessExecution::asyncExecuteProcess;

    constexpr auto timeout = 50ms;
    constexpr auto script  = R"(
         number=$((1 + $RANDOM % 9));
         sleep "0.0$number";
         echo -n "sanity restored after 0.0$number";
         exit $number
    )";

    asio::io_context ioc(1);
    auto work = make_work_guard(ioc);

    for (int i = 0; i < 10; i++)
        asyncExecuteProcess(                           //
            "/bin/bash", {"-c", script}, timeout, ioc, //
            [i](std::error_code ec, int exit_code, std::string data) {
                if (ec)
                    std::cout << "Completed #" << i << " " << ec.message() << " " << exit_code << " "
                              << quoted(data) << std::endl;
            });

    std::cout << "Everything initiated, single-threaded multiplexed completion" << std::endl;

    work.reset();
    ioc.run();
}

嵌入的 bash 脚本在 1..10 * 10ms 之间随机休眠,并返回相应的退出代码 1..10。由于我们“立即”启动它们,因此我们期望结果按延迟的升序排列,但不一定按作业编号的顺序(#i)。

由于所有超过 50 毫秒的延迟都会超时,我们预计最后几个条目会显示错误。

魔法

在实现细节中,我们需要推导出关联的执行器(如果完成令牌是例如

bind_executor(my_strand, asio::use_awaitable)
)。

auto ex = asio::get_associated_executor(token, ioc.get_executor());

接下来,需要分配一些资源,以便它们在异步操作期间保持稳定:

auto a  = asio::get_associated_allocator(token);
auto st = std::allocate_shared<stable_state>(a, ex, deadline);

稳定状态包含所有不能(廉价)移动的东西:

struct stable_state : std::enable_shared_from_this<stable_state> {
    stable_state(asio::any_io_executor ex, time_point t) : timer(ex, t) {}
    asio::steady_timer       timer;
    bp::group                process_group;
    std::future<std::string> out, err;

    void arm_timer() {
        timer.async_wait([self = this /*->shared_from_this()*/](boost::system::error_code ec) {
            if (!ec)
                self->process_group.terminate(ec);
        });
    }
};

启动函数的其余部分可能如下所示:

auto init = [&ioc, ex, st] //
    (auto&& handler, path exe, Args args) {
        auto wrapped_handler = [h = std::move(handler), ex, st] //
            (int exit_code, std::error_code ec) mutable {
                bool terminated = !st->timer.expires_at(time_point::min());
                bool ok         = !terminated && !ec;

                std::string data;
                if (ok) {
                    if (st->out.wait_for(0s) == std::future_status::ready)
                        data = st->out.get();
                    else
                        ec = make_error_code(asio::error::interrupted); // TODO
                }

                if (terminated) {
                    data = "Killed";
                    if (!ec)
                        ec = make_error_code(asio::error::operation_aborted);
                }

                assert(st.unique());
                st.reset(); // deallocate before invoking the handler

                asio::dispatch(                                                //
                    ex, [=, h = std::move(h), d = std::move(data)]() mutable { //
                        std::move(h)(ec, exit_code, std::move(d));
                    });
            };

        bp::child(ioc, exe, args,        //
                  st->process_group,     //
                  bp::ignore_error,      //
                  bp::std_in.null(),     //
                  bp::std_out > st->out, //
                  bp::std_err > st->err, //
                  bp::on_exit(std::move(wrapped_handler)))
            .detach();

        st->arm_timer();
    };
return asio::async_initiate<Token, Signature>( //
    init, token, std::move(exe), std::move(args));

注意我选择通过检测计时器是否已经完成来间接“软检测”终止。在 POSIX 上,更好的方法是将

bp::child
保持在稳定状态并查询
native_exit_code
,如下所述:https://stackoverflow.com/a/57733210/85371

否则,大多数代码都是确保使用正确的执行器,在需要的时刻释放分配,并执行各种断言以防止编程错误。

演示时间

当然,如果没有布丁的证明,我们会在哪里:

Live On Coliru(Coliru 速度太慢,输出无法说明)

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <iostream>
using namespace std::chrono_literals;
namespace asio = boost::asio;

namespace ProcessExecution {
    using boost::filesystem::path;
    using Signature = void(std::error_code, int, std::string);
    using Duration  = std::chrono::system_clock::duration;
    using Args      = std::vector<std::string>;

    template <typename Token>
    auto asyncExecuteProcess(                                       //
        path exe, Args args, Duration limit, asio::io_context& ioc, //
        Token&& token)                                              //
    {
        namespace bp = boost::process;

        using time_point    = std::chrono::steady_clock::time_point;
        time_point deadline = std::chrono::steady_clock::now() + limit;

        struct stable_state : std::enable_shared_from_this<stable_state> {
            stable_state(asio::any_io_executor ex, time_point t) : timer(ex, t) {}
            asio::steady_timer       timer;
            bp::group                process_group;
            std::future<std::string> out, err;

            void arm_timer() {
                timer.async_wait([self = this /*->shared_from_this()*/](boost::system::error_code ec) {
                    if (!ec)
                        self->process_group.terminate(ec);
                });
            }
        };

        auto ex = asio::get_associated_executor(token, ioc.get_executor());
        auto a  = asio::get_associated_allocator(token);
        auto st = std::allocate_shared<stable_state>(a, ex, deadline);

        auto init = [&ioc, ex, st] //
            (auto&& handler, path exe, Args args) {
                auto wrapped_handler = [h = std::move(handler), ex, st] //
                    (int exit_code, std::error_code ec) mutable {
                        bool terminated = !st->timer.expires_at(time_point::min());
                        bool ok         = !terminated && !ec;

                        std::string data;
                        if (ok) {
                            if (st->out.wait_for(0s) == std::future_status::ready)
                                data = st->out.get();
                            else
                                ec = make_error_code(asio::error::interrupted); // TODO
                        }

                        if (terminated) {
                            data = "Killed";
                            if (!ec)
                                ec = make_error_code(asio::error::operation_aborted);
                        }

                        assert(st.unique());
                        st.reset(); // deallocate before invoking the handler

                        asio::dispatch(                                                //
                            ex, [=, h = std::move(h), d = std::move(data)]() mutable { //
                                std::move(h)(ec, exit_code, std::move(d));
                            });
                    };

                bp::child(ioc, exe, args,        //
                          st->process_group,     //
                          bp::ignore_error,      //
                          bp::std_in.null(),     //
                          bp::std_out > st->out, //
                          bp::std_err > st->err, //
                          bp::on_exit(std::move(wrapped_handler)))
                    .detach();

                st->arm_timer();
            };
        return asio::async_initiate<Token, Signature>( //
            init, token, std::move(exe), std::move(args));
    }
} // namespace ProcessExecution

int main() {
    using ProcessExecution::asyncExecuteProcess;

    constexpr auto timeout = 50ms;
    constexpr auto script  = R"(
         number=$((1 + $RANDOM % 9));
         sleep "0.0$number";
         echo -n "sanity restored after 0.0$number";
         exit $number
    )";

    asio::io_context ioc(1);
    auto work = make_work_guard(ioc);

    for (int i = 0; i < 10; i++)
        asyncExecuteProcess(                           //
            "/bin/bash", {"-c", script}, timeout, ioc, //
            [i](std::error_code ec, int exit_code, std::string data) {
                std::cout << "Completed #" << i << " " << ec.message() << " " << exit_code << " "
                          << quoted(data) << std::endl;
            });

    std::cout << "Everything initiated, single-threaded multiplexed completion" << std::endl;

    work.reset();
    ioc.run();
}


¹ 这里仍然存在着一场微妙的竞赛,即使

terminated==false
我们仍然必须检查
out
的未来是
ready
。如果我们不这样做,我们可能会非常不幸,并且会发生一场未来尚未准备好但进程已被终止的竞赛。我怀疑这表明 Boost Process 实际上并未在服务线程上执行所有工作(根据需要)。我可能稍后会研究这个。

© www.soinside.com 2019 - 2024. All rights reserved.