如何避免并发回调用户定义的例程?

问题描述 投票:0回答:1

我正在尝试修改一些Boost代码,使其与AutoHotKey兼容。原始项目可以在here找到。我的版本可以在这里找到。我可以使用一些帮助来确定如何防止多个并发回调进入用户提供的 AutoHotKey 例程。

这是现有的 on_read 回调 --

/// Callback registered by async_read. It calls user registered callback to actually process the data. And then issue another async_read to wait for data from server again.
    /// \param ec instance of error code
    /// \param bytes_transferred
    void
    on_read(
            beast::error_code ec,
            std::size_t bytes_transferred) {
        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
        }       
        boost::ignore_unused(bytes_transferred);

        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            if(!Is_Connected) {
                return;
            }

        }

        // error occurs
        if (ec) {
            if(on_fail_cb)
                on_fail_cb(L"read");
            return fail(ec, L"read");
        }

        const std::string data = beast::buffers_to_string(buffer_.data());
        const std::wstring wdata(data.begin(), data.end());
        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
        }       

//  The next section is where my issue resides

        if (on_data_cb)
            on_data_cb(wdata.c_str(), wdata.length());

        buffer_.consume(buffer_.size());

        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
        }       
        ws_.async_read(
                buffer_,
                beast::bind_front_handler(
                        &session::on_read,
                        shared_from_this()));

        // Close the WebSocket connection
        // ws_.async_close(websocket::close_code::normal,
        //     beast::bind_front_handler(
        //         &session::on_close,
        //         shared_from_this()));
    }

代码

if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length());
将回调执行到AutoHotKey中,我需要知道如何防止它一次执行多次。我不太精通 C++ / Boost,所以请温柔一点。 ;-)

c++ boost boost-asio autohotkey
1个回答
0
投票

温和的答案是指向文档:Strands:使用没有显式锁定的线程

实际上,您没有显示足够的代码。例如,我们无从得知

  • 正在使用什么执行上下文。如果您使用带有单个服务线程的

    io_context
    run()
    ,那么您已经拥有隐式链并保证没有处理程序同时运行

  • IO 对象绑定到哪个执行器。在您的代码中,唯一可见的对象是

    ws_
    ,我们假设它类似于

     net::io_context                ctx_;
     websocket::stream<tcp::socket> ws_{ctx_};
    

    现在,如果您想要多个线程服务

    ctx_
    ,您可以将
    ws_
    绑定到链执行器:

     websocket::stream<tcp::socket> ws_{make_strand(ctx_)};
    

    现在,只要您确保自己的访问(例如 async_ 启动)位于正确的链上,您的代码就已经安全了。如果您愿意 - 并且您不介意对执行器类型进行硬编码,您可以断言:

    自动链= ws_.get_executor().targetnet::strand(); 断言(股 && 股->running_in_this_thread());

专业提示:

如果您确实致力于特定的执行程序类型,请考虑静态绑定该类型:

using Context  = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand   = net::strand<net::io_context::executor_type>;
using Socket   = net::basic_stream_socket<tcp, Strand>;

Context                   ctx_;
websocket::stream<Socket> ws_{make_strand(ctx_)};

这避免了类型擦除执行器的开销,您可以 简化断言:

assert(ws_.get_executor().running_in_this_thread());

旁注

演示

强制性“实时”代码:

住在科里鲁

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;

static std::mutex s_consoleMtx;

static void fail(beast::error_code ec, std::string txt) {
    std::cerr << txt << ": " << ec.message() << " at " << ec.location() << std::endl;
}

#define ARCH_LABEL "STACKO"
struct session : std::enable_shared_from_this<session> {
    using Context  = net::io_context::executor_type;
    using Executor = net::io_context::executor_type;
    using Strand   = net::strand<net::io_context::executor_type>;
    using Socket   = net::basic_stream_socket<tcp, Strand>;

    Context                   ctx_;
    websocket::stream<Socket> ws_{make_strand(ctx_)};

    static bool const  EnableVerbose = true;
    std::atomic_bool   Is_Connected  = false;
    beast::flat_buffer buffer_;

    std::function<void(std::string)>         on_fail_cb;
    std::function<void(char const*, size_t)> on_data_cb;

    /// Callback registered by async_read. It calls user registered
    /// callback to actually process the data. And then issue another
    /// async_read to wait for data from server again. 
    /// \param ec instance of error code 
    /// \param bytes_transferred
    void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
        if (EnableVerbose) {
            std::lock_guard<std::mutex> guard(s_consoleMtx);
            std::cout << "<WsDll-" ARCH_LABEL "> in on read" << std::endl;
        }

        if (!Is_Connected)
            return;

        // error occurs
        if (ec) {
            if (on_fail_cb)
                on_fail_cb("read");
            return fail(ec, "read");
        }

        std::string const data = beast::buffers_to_string(buffer_.data());
        if (EnableVerbose) {
            std::lock_guard<std::mutex> guard(s_consoleMtx);
            std::cout << "<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << "] " << data << std::endl;
        }

        if (on_data_cb)
            on_data_cb(data.c_str(), data.length());

        buffer_.consume(buffer_.size());

        if (EnableVerbose) {
            std::lock_guard<std::mutex> guard(s_consoleMtx);
            std::cout << "<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
        }

        assert(ws_.get_executor().running_in_this_thread());
        ws_.async_read(buffer_, beast::bind_front_handler(&session::on_read, shared_from_this()));
    }
};
© www.soinside.com 2019 - 2024. All rights reserved.