当作业在线程之间发布时,Boost ASIO 多线程应用程序崩溃

问题描述 投票:0回答:1

我正在尝试将 boost 单线程应用程序扩展到多线程应用程序。该代码基于这篇文章。我采取的方法是为每个线程提供专用的 IO_Context。在我的应用程序中,我需要将 ASIO JOB 从其他线程发布到另一个线程的 IO_Context。根据 boost article,这应该可以工作,但我的应用程序在几次迭代后因分段错误而崩溃。

#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>

namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;

static std::atomic_int tid_gen = 0;
thread_local int const tid     = [] { return ++tid_gen; }();
static constexpr auto  now = std::chrono::steady_clock::now;
static auto const      start   = now();
static std::mutex console_mx;

void trace(auto const&... msg) {
    std::lock_guard lk(console_mx);
    std::cerr << "at " << std::setw(8) << (now() - start)/1ms << "ms - tid:" << tid << " ";
    (std::cerr << ... << msg) << std::endl;
}

void worker(asio::io_context& ioContext) {
    trace("Worker thread enter");
    ioContext.run(); // Run the io_context to handle asynchronous operations
    trace("Worker thread exit");
}

int main() {
    try {
        asio::io_context ioContext1;
        asio::io_context ioContext2;
        std::function<void(const boost::system::error_code& error)> handler1;
        std::function<void(const boost::system::error_code& error)> handler2;

        asio::steady_timer task1(ioContext1, 100ms);
        asio::steady_timer task2(ioContext2, 200ms);

        handler1 = [&task1, &handler1, &ioContext2](error_code ec) {
            //trace("Start Task1: ", ec.message());
            if (!ec)
                usleep(5000);
            asio::post(ioContext2, []{
                trace("Task1 posted job on Task2");
            });
            task1.async_wait(handler1);
        };
        task1.async_wait(handler1);

        handler2 = [&task2, &handler2, &ioContext1](error_code ec) {
            //trace("Start Task2: ", ec.message());
            if (!ec)
                usleep(10000);
            asio::post(ioContext1, []{
                trace("Task2 posted job on Task1");
            });  
            task2.async_wait(handler2);
        };
        task2.async_wait(handler2);


        // Create a work object to prevent ioContext.run() from returning immediately
        auto work1 = make_work_guard(ioContext1);
        auto work2 = make_work_guard(ioContext2);

        // Create multiple worker threads
        std::vector<std::thread> threads;
        threads.emplace_back(worker, std::ref(ioContext1));
        threads.emplace_back(worker, std::ref(ioContext2));


        trace("App started :", std::thread::hardware_concurrency());

        work1.reset();
        work2.reset();

        // Join the worker threads
        for (auto& thread : threads) {
            thread.join();
        }

        trace("All worker threads joined.");
    } catch (std::exception const& e) {
        trace("Exception: ", std::quoted(e.what()));
    }
}

这是回溯轨迹

Thread 3 "application" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x763de380 (LWP 29973)]
boost::asio::detail::scheduler::do_run_one (this=this@entry=0x42d188, lock=..., this_thread=..., ec=...) at /usr/include/boost/asio/detail/impl/scheduler.ipp:458

有谁知道这种方法是否应该有效或预计会失败? 我尝试在发布职位之前添加单独的锁,但这也没有帮助。

任何其他在 IO_Context 之间发布作业的建议也将非常有用。 我在嵌入式 Linux 5.15 平台上使用 boost 版本 1.79,该平台在双核 Cortex A7 硬件上运行。

c++ multithreading boost-asio
1个回答
0
投票

我没有发现发布的代码有问题。

我注意到一些问题:计时器...没用,因为所有 async_wait 都会立即完成(第一个除外)。

工作警卫没有增加任何价值(因为总是有工作)。

我将其简化为这个

您还缺少

worker
函数中的异常处理(应该捕获 boost::asio::io_service::run() 抛出的异常吗?)。我将把它作为练习留给读者。

正确性证明

唯一引用timer1的代码在loop1中。 唯一引用timer2的代码在loop2中。

loop1/loop2 是隐式链(它们仅从自己的完成处理程序连续执行链接)。

ioc1
ioc2
是共享的,但是记录的线程安全(假设有足够的生命周期,即除了构造/销毁),还有一些与这里不相关的例外:

共享对象:安全,但

restart()
notify_fork()
函数除外。在有未完成的 run()、
run_one()
run_for()
run_until()
poll()
poll_one()
调用时调用 restart() 会导致未定义的行为。在另一个线程中调用任何
notify_fork()
函数或与
io_context
关联的 I/O 对象上的任何函数时,不应调用
io_context
函数。

简化 - 封装

如果您希望异步计时器定期重新安排,我之前已经做了一些示例来封装该逻辑,这样您就不需要重复这么多代码。

此外,它应该将计时器与执行上下文分离,因为如果需要的话,在单个上下文/线程上运行多个计时器是完全有效的。当然,如果任务占用大量 CPU,您不应该从(同一)服务线程调用它们。

请参阅 Asio async_wait 给出操作已取消以获取完整说明。

简化 - 线程池和线程

为了弥补异常处理的缺乏以及许多(冗余)上下文的杂乱,我建议使用共享上下文。现在,您仍然可以通过使用链确保所有处理程序在特定“任务”上“序列化”。

当您说

“所以我想要对锁做的是,在发布作业之前获取 io_context 独有的锁,但这没有帮助” . 它没有帮助的原因是

你想要序列化处理程序
    调用
  • ,而不是排队的时间 您无法对异步调用持有锁,因为它破坏了执行程序抽象:执行程序可能会导致在不同线程上调用处理程序,而不会持有锁。理论上,您可以在仅移动处理程序对象中保留一个
  • unique_lock
  • ,但我认为 99% 的情况下这是一种强烈的代码味道(也是导致死锁的原因)
    
    

住在Coliru static constexpr auto unit = 1000ms; asio::thread_pool ioc(2); // or leave the argument to get a system-dependent default size std::function<void(error_code error)> loop1, loop2; auto make_loop = [](auto& loop, int id, auto ex, auto otherEx) { auto interval = id * unit; loop = [=, &loop, t = std::make_shared<asio::steady_timer>(ex, interval)](error_code ec) { trace("Start Task", id, ": ", ec.message()); if (!ec) sleep_for(interval / 2); asio::post(otherEx, [id] { trace("job cross-posted from Task", id); }); t->expires_from_now(interval); t->async_wait(loop); }; asio::post(ex, bind(loop, error_code{})); // prime the pump }; auto task1 = make_strand(ioc); auto task2 = make_strand(ioc); make_loop(loop1, 1, task1, task2); make_loop(loop2, 2, task2, task1); trace("App started :", std::thread::hardware_concurrency()); ioc.join();

印刷

g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -DCOLIRU && ./a.out at 0ms - tid:0 main at 0ms - tid:1 Start Task1: Success at 0ms - tid:0 App started :4 at 0ms - tid:2 Start Task2: Success at 1000ms - tid:1 job cross-posted from Task2 at 1000ms - tid:2 job cross-posted from Task1 at 1500ms - tid:1 Start Task1: Success at 2001ms - tid:1 job cross-posted from Task1 at 3000ms - tid:2 Start Task2: Success at 3001ms - tid:1 Start Task1: Success at 4001ms - tid:2 job cross-posted from Task2 at 4001ms - tid:1 job cross-posted from Task1 at 4501ms - tid:2 Start Task1: Success at 5001ms - tid:2 job cross-posted from Task1 at 6001ms - tid:1 Start Task2: Success at 6001ms - tid:2 Start Task1: Success at 7001ms - tid:1 job cross-posted from Task2 at 7001ms - tid:2 job cross-posted from Task1 at 7502ms - tid:1 Start Task1: Success at 8002ms - tid:1 job cross-posted from Task1 at 9001ms - tid:2 Start Task2: Success at 9002ms - tid:1 Start Task1: Success at 10001ms - tid:0 All worker threads joined.

请注意,这实际上比您最初的重现少了 28 行代码 - 33% 吸脂率。

总结/结论

公平地说,您可能确实需要任务队列和专用线程,而不是硬塞 io_context 作为您的排队机制。使用上面的 IntervalTimer,您可以很好地集成它,让 Asio 管理计时器。

© www.soinside.com 2019 - 2024. All rights reserved.