我有一个简单的类,它使用 boost::asio 在另一个线程上处理回调函数的执行,并有延迟。 延迟可以重置,因此该类也可以像看门狗一样工作。
该类可以正常工作,但是时不时地(大约 10% 的时间),它会因类销毁而崩溃并出现 SIGSEGV。我似乎不明白为什么。我的第一个想法是,也许
this
不再有效(如错误跟踪中的 this=0x0
所示),但由于我们正在等待线程在完成销毁类之前加入,所以它应该对整个类有效期间。还是我有什么使命?
感谢大家的帮助! 下面是类 .cpp 代码和 GDB 堆栈跟踪。变量类型如下。
boost::asio::chrono::milliseconds delay_ms_;
boost::asio::io_context io_;
boost::asio::steady_timer timer_;
boost::thread thread_;
std::mutex mutable mutex_;
bool is_running = false;
AsyncExecutor::AsyncExecutor(std::chrono::milliseconds const &delay_ms, Callback callback) :
delay_ms_(delay_ms),
callback_(callback),
io_(),
timer_(io_, delay_ms_)
{
}
AsyncExecutor::~AsyncExecutor() {
// Make sure the timer stops when the object is destroyed & callback is not called.
stop();
}
void AsyncExecutor::start() {
std::lock_guard<std::mutex> lock(mutex_);
if (is_running)
return;
is_running = true;
if (io_.stopped()) {
io_.reset(); // Reset io_service if it has stopped
}
timer_.expires_after(delay_ms_);
timer_.async_wait(boost::bind(&AsyncExecutor::timer_ran, this, boost::asio::placeholders::error));
// Start the io context in a separate thread.
// Once the timer fires, the thread will exit.
thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_));
}
void AsyncExecutor::stop() {
{
std::lock_guard<std::mutex> lock(mutex_);
if (!is_running)
return;
// Cancel the timer.
boost::asio::post(io_, [this]() {
this->timer_.cancel();
});
}
// Finaly wait for the thread to finish.
if (this->thread_.joinable())
this->thread_.join();
std::lock_guard<std::mutex> lock(mutex_);
is_running = false;
}
void AsyncExecutor::reset() {
std::lock_guard<std::mutex> lock(mutex_);
if (!is_running)
return;
// Reset the timer.
boost::asio::post(io_, [this]() {
this->timer_.expires_after(this->delay_ms_);
this->timer_.async_wait(boost::bind(&AsyncExecutor::timer_ran, this, boost::asio::placeholders::error));
});
}
void AsyncExecutor::timer_ran(const boost::system::error_code &ec) {
{
std::lock_guard<std::mutex> lock(mutex_);
is_running = false;
}
// If the timer was cancelled, we don't want to execute the callback or detach the thread.
// Reset also calls this function with ec, so we want to keep the thread attached.
// This complicates things & means that join() must be called in stop() to wait for the thread to finish.
// But if the thread fires the callback, it will detach itself.
if (ec) {
return;
}
if (this->callback_)
this->callback_();
io_.stop();
// Detach the thread, as it is no longer needed. It will be cleaned up by the OS.
this->thread_.detach();
}
bool AsyncExecutor::is_timer_running() const {
std::lock_guard<std::mutex> lock(mutex_);
return is_running;
}
[gdb-7] [New Thread 0x7fff1a7fc000 (LWP 385144)]
[gdb-7] [Thread 0x7fff1a7fc000 (LWP 385075) exited]
[gdb-7]
[gdb-7] Thread 171 "scout_control_u" received signal SIGSEGV, Segmentation fault.
[gdb-7] [Switching to Thread 0x7fff2253c000 (LWP 385067)]
[gdb-7] boost::asio::detail::epoll_reactor::run (this=0x0, usec=<optimized out>, ops=...) at /usr/include/boost/asio/detail/impl/epoll_reactor.ipp:462
[gdb-7] 462 if (timer_fd_ == -1)
[gdb-7] #0 boost::asio::detail::epoll_reactor::run(long, boost::asio::detail::op_queue<boost::asio::detail::scheduler_operation>&)
[gdb-7] (this=0x0, usec=<optimized out>, ops=...)
[gdb-7] at /usr/include/boost/asio/detail/impl/epoll_reactor.ipp:462
[gdb-7] #1 0x00007ffff6f0aef3 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&)
[gdb-7] (this=this@entry=0x7fff1412e5f0, lock=..., this_thread=..., ec=...)
[gdb-7] at /usr/include/boost/asio/detail/impl/scheduler.ipp:465
[gdb-7] #2 0x00007ffff6f0bb01 in boost::asio::detail::scheduler::run(boost::system::error_code&) (this=0x7fff1412e5f0, ec=...)
[gdb-7] at /usr/include/boost/asio/detail/impl/scheduler.ipp:204
[gdb-7] #3 0x00007ffff6f0d552 in boost::asio::io_context::run() (this=<optimized out>)
[gdb-7] at /usr/include/boost/asio/impl/io_context.ipp:63
[gdb-7] #4 0x00007ffff1f570cb in ()
[gdb-7] at /lib/x86_64-linux-gnu/libboost_thread.so.1.74.0
[gdb-7] #5 0x00007ffff7094ac3 in start_thread (arg=<optimized out>)
[gdb-7] at ./nptl/pthread_create.c:442
[gdb-7] #6 0x00007ffff7126850 in clone3 ()
[gdb-7] at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
我期待班级取消后台任务而不是抛出。
您将 io_service (!?) 与 io_context 混合,使用 boost::thread 而不是 std::thread,使用完整的互斥体在 bool 上进行同步(使其成为原子?),这是多余的,因为计时器知道所有内容它需要知道。尽管如此,
reset()
和stop()
调用未定义的行为,因为steady_timer
实例不是线程安全的。
您无缘无故地加入和分离线程(没有理由不在这里加入它)。总而言之,您基本上正在实现一个非常昂贵的版本
void on_timeout(Duration d, Callback cb) {
std::thread{[d, cb] {
std::this_thread::sleep_for(d);
cb();
}}.detach();
}
也许唯一的额外功能是在等待时通过调用
reset()
来推迟它。考虑根本不用担心线程:
struct AsyncExecutor {
using Timer = asio::steady_timer;
AsyncExecutor(Duration delay_ms, Callback callback) : delay_ms_(delay_ms), callback_(callback) {}
~AsyncExecutor() {
trace("~dtor()");
auto f = timer_.async_wait(asio::use_future);
do_stop();
f.wait(); // ensure the timer is done
}
void start() { trace("start()"); do_start(false); }
void reset() { trace("reset()"); do_start(true); }
void stop() { trace("stop()"); do_stop(); }
private:
static constexpr Timer::time_point STOPPED = Timer::time_point::min();
asio::steady_timer timer_{make_strand(asio::system_executor{}), STOPPED};
Duration delay_ms_;
Callback callback_;
void do_start(bool require_running) {
dispatch(timer_.get_executor(), [this, require_running] {
if (require_running == (timer_.expiry() != STOPPED)) {
auto num_canceled = timer_.expires_after(delay_ms_);
trace(" [setting delay, canceled " + std::to_string(num_canceled) + "]");
timer_.async_wait(std::bind(&AsyncExecutor::timer_ran, this, _1));
}
});
}
std::future<size_t> do_stop() {
return dispatch(timer_.get_executor(),
asio::use_future([this] { return timer_.expires_at(STOPPED); }));
}
void timer_ran(boost::system::error_code ec) {
trace(" [timer_ran: " + ec.message() + "]");
if (ec != asio::error::operation_aborted) {
timer_.expires_at(STOPPED);
if (callback_)
callback_();
}
}
};
请注意,除了构造/销毁之外的所有操作都是线程安全的。
演示程序:
#include <boost/asio.hpp>
#include <fmt/core.h>
namespace asio = boost::asio;
using Callback = std::function<void()>;
using Duration = asio::steady_timer::duration;
using namespace std::placeholders;
using namespace std::chrono_literals;
namespace { // logging
constexpr auto now = std::chrono::steady_clock::now;
auto start = now();
void trace(std::string const& msg) { fmt::print("at {:4}ms {}\n", (now() - start) / 1ms, msg); }
} // namespace
struct AsyncExecutor {
using Timer = asio::steady_timer;
AsyncExecutor(Duration delay_ms, Callback callback) : delay_ms_(delay_ms), callback_(callback) {}
~AsyncExecutor() {
trace("~dtor()");
auto f = timer_.async_wait(asio::use_future);
do_stop();
f.wait(); // ensure the timer is done
}
void start() { trace("start()"); do_start(false); }
void reset() { trace("reset()"); do_start(true); }
void stop() { trace("stop()"); do_stop(); }
private:
static constexpr Timer::time_point STOPPED = Timer::time_point::min();
asio::steady_timer timer_{make_strand(asio::system_executor{}), STOPPED};
Duration delay_ms_;
Callback callback_;
void do_start(bool require_running) {
dispatch(timer_.get_executor(), [this, require_running] {
if (require_running == (timer_.expiry() != STOPPED)) {
auto num_canceled = timer_.expires_after(delay_ms_);
trace(" [setting delay, canceled " + std::to_string(num_canceled) + "]");
timer_.async_wait(std::bind(&AsyncExecutor::timer_ran, this, _1));
}
});
}
std::future<size_t> do_stop() {
return dispatch(timer_.get_executor(),
asio::use_future([this] { return timer_.expires_at(STOPPED); }));
}
void timer_ran(boost::system::error_code ec) {
trace(" [timer_ran: " + ec.message() + "]");
if (ec != asio::error::operation_aborted) {
timer_.expires_at(STOPPED);
if (callback_)
callback_();
}
}
};
using std::this_thread::sleep_for;
int main() {
{
trace("--- Main started");
AsyncExecutor job(1500ms, [] {trace("~~~RUNNING TIMED JOB~~~"); });
sleep_for(500ms);
job.reset(); // no effect, not pending
job.start();
sleep_for(200ms);
job.start(); // no effect, already pending
trace("--- Main keeping the job alive");
for (auto i = 0; i < 5; ++i) {
job.reset(); // restarts the wait of 1500ms
sleep_for(1.0s);
}
trace("--- Main loop done");
sleep_for(1.0s); // this allows job to run
job.reset(); // no effect, not pending
job.start();
sleep_for(200ms);
} // destructor cancels the job
trace("--- Main exit");
}
打印例如: