boost::interprocess::message_queue 上的随机 boost::interprocess_exception::library_error

问题描述 投票:0回答:1

我使用消息队列为 IPC 创建了一个模板类。
我正在无限 while 循环(称为主循环)中运行我的程序。
我通过以太网从各种子系统(传感器)收集数据,并使用消息队列将接收到的数据传递到适当的进程(它们是多个不同的进程,可以充当数据接收器,每个进程都有自己的消息队列)。
我刚刚运行了该程序,没有执行任何活动。这是唯一运行的程序,每次运行之前我都会重新启动操作系统。
该程序只是在 while 循环中运行,其中所有标志都设置为 false;因此该程序只是运行一个空白(空)循环。
随机地我得到了

boost::interprocess_exception::library_error
。由于他们没有活动,我预计他们应该没有错误。

我注释掉了以太网相关代码,但仍然遇到相同的错误。

我在声明中遇到错误:

if (primaryNode == true)
{
    this->mSecondaryToPrimaryMessageQueue->receive(
        &receiveData,
        sizeof(receiveData),
        receiveLength,
        priority
    );
}
else
{
    this->mPrimaryToSecondaryMessageQueue->receive(
        &receiveData,
        sizeof(receiveData),
        receiveLength,
        priority
    );
}

我尝试将 PrimaryNode 设置为 true 或 false。我也遇到同样的错误。

代码:

ipc.hpp

#pragma once

#include <thread>
#include <string>
#include <atomic>
#include <memory>
#include <variant>
#include <optional>
#include <iostream>
#include <functional>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

/// @brief 
/// @tparam T1 Specifies the data-type that has to be sent
/// @tparam T2 Specifies the data-type that has will be received
/// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
template<typename T1, typename T2, bool primaryNode>
class Ipc
{
private:
    static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
    using callback_t = std::function<void(void)>;
    callback_t mCallback;
    std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
    std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;

    std::string         mPrimaryToSecondaryMessageQueueName;
    std::string         mSecondaryToPrimaryMessageQueueName;

    std::thread         mReceiveThread;
    std::atomic_bool    mExitReceiveThread{ false };
    boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;

    void listen(void);
public:
    Ipc() {}
    bool open(const std::string& queueName);
    bool close(void);
    bool send(const T1& data, std::uint32_t priority = 10);
    std::optional<T2> receive(void);
    bool register_callback(callback_t callback_implementation);
    bool isDataAvailableInReceiveDataQueue(void) const;
};


template<typename T1, typename T2, bool primaryNode>
inline void Ipc<T1, T2, primaryNode>::listen(void)
{
    T2                          receiveData;//Buffer to store received data
    std::uint64_t               receiveLength;
    std::uint32_t               priority;
    while(this->mExitReceiveThread.load() == false)
    {
        try
        {
            std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
            receiveLength = 0; //Initialize read length to 0
            priority = 0; //Initialize priority to 0
            if (primaryNode == true)
            {
                this->mSecondaryToPrimaryMessageQueue->receive(
                    &receiveData,
                    sizeof(receiveData),
                    receiveLength,
                    priority
                );
            }
            else
            {
                this->mPrimaryToSecondaryMessageQueue->receive(
                    &receiveData,
                    sizeof(receiveData),
                    receiveLength,
                    priority
                );
            }
            this->mReceiveDataQueue.push(receiveData);
            this->mCallback();
        }
        catch (const std::exception& ex)
        {
            std::cout << "Inside Listen Exception\n";
            std::cout << ex.what() << std::endl;
        }
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
{
    try
    {
        if(primaryNode == true)
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        else
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        //Open-Create message queue to send data from primaryNode node to secondary node
        this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
            boost::interprocess::open_or_create,
            this->mPrimaryToSecondaryMessageQueueName.c_str(),
            MAX_MESSAGE_DEPTH,
            sizeof(T1)
        );

        //Open-Create message queue to send data from secondary node to primaryNode node  
        this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
            boost::interprocess::open_or_create,
            this->mSecondaryToPrimaryMessageQueueName.c_str(),
            MAX_MESSAGE_DEPTH,
            sizeof(T2)
        );

        //Start Listner Thread
        this->mReceiveThread = std::thread(&Ipc::listen, this);
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::close(void)
{
    try
    {
        this->mExitReceiveThread.store(true); //Marked to close thread
        boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
        boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
{
    try
    {
        if (primaryNode == true) //Send message on Primary to Secondary Queue
        {
            this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
        }
        else //Send message on Secondary to Primary Queue
        {
            this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
        }
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
{
    std::optional<T2> data{ std::nullopt };
    if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
    {
        data = this->mReceiveDataQueue.front();
        this->mReceiveDataQueue.pop();
    }
    else
    {
        //data = std::nullopt; //Not needed 
    }
    return data;
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
{
    try
    {
        this->mCallback = callbackImplementation;
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cerr << ex.what() << '\n';
    }
    return false;
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
{
    if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
    {
        return true;
    }
    else
    {
        return false;
    }
}

main.cpp

#include <ipc.hpp>
#include <iostream>

//P1 stands for Process 1
//P2 stands for Process 2
struct P1ToP2
{
    float a;
    int b;
};

struct P2ToP1
{
    int a;
    int b;
};

Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object

void message_queue_data_received(void)
{
    if (ipc1.isDataAvailableInReceiveDataQueue() == true)
    {
        auto s = ipc1.receive();
        if (s.has_value() == true)
        {
            std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
        }

    }
}

int main(int argc, char *argv[])
{
    bool dataReceivedOnEthernet = false;
    
    ipc1.register_callback(message_queue_data_received);
    this->ipc1.open("ipc1");
    
    while(true)
    {
        if(dataReceivedOnEthernet == true) //Flag set by other thread
        {
            P1ToP2 p;
            p.a = 10.23; //Some Data received over ethernet
            p.b = 10; //Some Data received over ethernet
            ipc1.send(p); //Send data over IPC
        }
        //Other Code
    }
}

错误

boost::interprocess_exception::library_error
c++ boost boost-interprocess
1个回答
0
投票

为什么进程对消息使用不同的类型,同时默默地假设它们具有相同的大小(以及简单和标准布局等......)。您是否混淆了各种类型和队列?看起来是这样的。

如果你能很好地命名事物,这会很有帮助。另外,删除重复项。

我会按消息类型分隔队列。按角色命名:

// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;

然后,通过定义一个 Channel 类型:

using Prio = uint32_t;
template <typename T> struct Channel {
    Channel(std::string name);
    ~Channel();

    std::tuple<T, Prio> receive(std::stop_token token);
    bool send(T const& msg, Prio prio, std::stop_token token);

  private:
    using Queue = boost::interprocess::message_queue;
    std::string name_;
    Queue       queue_ = open();

    Queue open() {
        if constexpr (IsClient) {
            return {boost::interprocess::open_only, name_.c_str()};
        } else {
            return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
        }
    }
};

现在我们可以简单地说:

Channel<Request>  reqQ;
Channel<Response> resQ;

具有类似的构造

 Ipc(std::string const& queueName) try
    : reqQ(queueName + "_req")
    , resQ(queueName + "_res")
    , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
    std::cerr << "Ipc: " << ex.what() << std::endl;
    throw;
}

监听器将收到的消息排队。类型取决于客户端/服务器模式:

using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;

注意我如何选择更安全的

jthread
并带有停止标记来协调线程退出:

std::jthread      mThread;
std::stop_token   mToken = mThread.get_stop_token();

void listen(std::stop_token token) {
    while (!token.stop_requested()) {
        try {
            if constexpr (IsClient)
                mInbox.push(get<0>(resQ.receive(token)));
            else
                mInbox.push(get<0>(reqQ.receive(token)));

            if (mCallback)
                mCallback();
        } catch (std::exception const& ex) {
            std::cerr << "Listen: " << ex.what() << std::endl;
        }
    }
}

外部操作看起来简单多了,比如:

void wait() { mThread.join(); }

void close() {
    mThread.request_stop();
    if (std::this_thread::get_id() != mThread.get_id())
        wait();
}

bool send(Request  const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }

std::optional<Incoming> consume() {
    if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
        return val;
    return {};
}

void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }

示例客户端/服务器

让我们定义上面的服务器,通过发回a和b/2的平方根来响应

Requests

void server() {
    ServerIpc ipc(IPC_NAME);

    auto handler = [&ipc] {
        assert(ipc.haveMessage());
        if (std::optional<Request> req = ipc.consume()) {
            auto [a, b] = *req;
            std::cout << "server received request a:" << a << " b:" << b << std::endl;

            if (a == -42 && b == -42) {
                std::cout << " -> server handling close request" << std::endl;
                ipc.close();
            } else {
                // send response
                ipc.send(Response{sqrt(a), b / 2});
            }
        }
    };

    ipc.register_callback(handler);
    ipc.wait();
}

仅此而已。请注意我们如何添加一种机制来告诉服务器客户端希望它退出(因为服务器拥有资源)。客户端可能看起来像这样:

void client() {
    ClientIpc ipc(IPC_NAME);

    auto handler = [&ipc] {
        assert(ipc.haveMessage());
        if (std::optional<Response> res = ipc.consume()) {
            auto [a, b] = *res;
            std::cout << "client received response a:" << a << " b:" << b << std::endl;
        }
    };

    ipc.register_callback(handler);

    for (int i = 0; i < 100; ++i) {
        if (rand() % 30 == 0)            // Flag set by other thread
            ipc.send(Request{i, 2 * i}); // Send request

        std::this_thread::sleep_for(10ms);
    }

    std::cout << "Client sending close command" << std::endl;
    ipc.send(Request{-42, -42});

    std::cout << "Closing" << std::endl;
    ipc.close();
}

它所做的只是发送一些约 10 秒的请求并记录响应。然后它告诉服务器退出并关闭。只有服务器会删除队列。

切换客户端/服务器的简单主线:

int main(int argc, char** argv) {
    if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
        server();
    else
        client();

    std::cout << "Bye" << std::endl;
}

现场演示

直播¹ 在 Coliru

  • 文件

    test.h

     #pragma once
    
     #include <boost/interprocess/ipc/message_queue.hpp>
     #include <boost/lockfree/spsc_queue.hpp>
     #include <iostream>
     #include <optional>
     #include <thread>
     using namespace std::chrono_literals;
    
     template <typename Request, typename Response, bool IsClient> class Ipc {
       private:
         static constexpr uint8_t MAX_DEPTH = 5;
    
         using callback_t = std::function<void()>;
         callback_t mCallback;
    
         using Prio = uint32_t;
         template <typename T> struct Channel {
             Channel(std::string name) : name_(std::move(name)) { //
                 assert(queue_.get_max_msg_size() == sizeof(T));
             }
    
             ~Channel() {
                 if (!IsClient) {
                     std::cerr << "Server cleaning up " << name_ << std::endl;
                     Queue::remove(name_.c_str());
                 }
             }
    
             std::tuple<T, Prio> receive(std::stop_token token) {
                 size_t len  = 0;
                 Prio   prio = 0;
                 T      msg{};
    
                 while (!token.stop_requested()) {
                     auto deadline = std::chrono::steady_clock::now() + 50ms;
                     if (queue_.timed_receive(&msg, sizeof(msg), len, prio, deadline)) {
                         assert(len == sizeof(T));
                         return {std::move(msg), prio};
                     }
                 }
    
                 throw std::runtime_error("stop requested");
             }
    
             bool send(T const& msg, Prio prio, std::stop_token token) {
                 while (!token.stop_requested()) {
                     auto deadline = std::chrono::steady_clock::now() + 50ms;
                     if (queue_.timed_send(&msg, sizeof(msg), prio, deadline))
                         return true;
                 }
                 return false;
             }
    
           private:
             using Queue = boost::interprocess::message_queue;
             std::string name_;
             Queue       queue_ = open();
    
             Queue open() {
                 if constexpr (IsClient) {
                     return {boost::interprocess::open_only, name_.c_str()};
                 } else {
                     return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
                 }
             }
         };
    
         Channel<Request>  reqQ;
         Channel<Response> resQ;
         std::jthread      mThread;
         std::stop_token   mToken = mThread.get_stop_token();
    
         using Incoming = std::conditional_t<IsClient, Response, Request>;
         boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
    
         void listen(std::stop_token token) {
             while (!token.stop_requested()) {
                 try {
                     if constexpr (IsClient)
                         mInbox.push(get<0>(resQ.receive(token)));
                     else
                         mInbox.push(get<0>(reqQ.receive(token)));
    
                     if (mCallback)
                         mCallback();
                 } catch (std::exception const& ex) {
                     std::cerr << "Listen: " << ex.what() << std::endl;
                 }
             }
         }
    
       public:
         Ipc(std::string const& queueName) try
             : reqQ(queueName + "_req")
             , resQ(queueName + "_res")
             , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
         } catch (std::exception const& ex) {
             std::cerr << "Ipc: " << ex.what() << std::endl;
             throw;
         }
    
         void wait() { mThread.join(); }
    
         void close() {
             mThread.request_stop();
             if (std::this_thread::get_id() != mThread.get_id())
                 wait();
         }
    
         bool send(Request  const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
         bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
    
         std::optional<Incoming> consume() {
             if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
                 return val;
             return {};
         }
    
         void register_callback(callback_t cb) { mCallback = cb; }
         bool haveMessage() const { return mInbox.read_available(); }
     };
    
  • 文件

    test.cpp

     #include "test.h"
     #include <cmath>
     #include <set>
    
     #include <boost/asio.hpp>
     static boost::asio::signal_set signals(boost::asio::system_executor{}, SIGTERM, SIGINT);
    
     // Requests are sent by client to server.
     // Responses are sent by server to client.
     struct Request { int a, b; };
     struct Response { double a; int b; };
     using ClientIpc = Ipc<Request, Response, true>;
     using ServerIpc = Ipc<Request, Response, false>;
    
     static std::string IPC_NAME = "so_demo_ipc";
     void server() {
         ServerIpc ipc(IPC_NAME);
    
         auto handler = [&ipc] {
             assert(ipc.haveMessage());
             if (std::optional<Request> req = ipc.consume()) {
                 auto [a, b] = *req;
                 std::cout << "server received request a:" << a << " b:" << b << std::endl;
    
                 if (a == -42 && b == -42) {
                     std::cout << " -> server handling close request" << std::endl;
                     ipc.close();
                 } else {
                     // send response
                     ipc.send(Response{sqrt(a), b / 2});
                 }
             }
         };
    
         ipc.register_callback(handler);
         ipc.wait();
     }
    
     void client() {
         ClientIpc ipc(IPC_NAME);
    
         auto handler = [&ipc] {
             assert(ipc.haveMessage());
             if (std::optional<Response> res = ipc.consume()) {
                 auto [a, b] = *res;
                 std::cout << "client received response a:" << a << " b:" << b << std::endl;
             }
         };
    
         ipc.register_callback(handler);
    
         for (int i = 0; i < 100; ++i) {
             if (rand() % 30 == 0)            // Flag set by other thread
                 ipc.send(Request{i, 2 * i}); // Send request
    
             std::this_thread::sleep_for(10ms);
         }
    
         std::cout << "Client sending close command" << std::endl;
         ipc.send(Request{-42, -42});
    
         std::cout << "Closing" << std::endl;
         ipc.close();
     }
    
     int main(int argc, char** argv) {
         if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
             server();
         else
             client();
    
         std::cout << "Bye" << std::endl;
     }
    

本地现场演示:

¹ 遗憾的是,在线编译器不允许共享内存访问

© www.soinside.com 2019 - 2024. All rights reserved.