我使用消息队列为 IPC 创建了一个模板类。
我正在无限 while 循环(称为主循环)中运行我的程序。
我通过以太网从各种子系统(传感器)收集数据,并使用消息队列将接收到的数据传递到适当的进程(它们是多个不同的进程,可以充当数据接收器,每个进程都有自己的消息队列)。
我刚刚运行了该程序,没有执行任何活动。这是唯一运行的程序,每次运行之前我都会重新启动操作系统。
该程序只是在 while 循环中运行,其中所有标志都设置为 false;因此该程序只是运行一个空白(空)循环。
随机地我得到了
boost::interprocess_exception::library_error
。由于他们没有活动,我预计他们应该没有错误。我注释掉了以太网相关代码,但仍然遇到相同的错误。
我在声明中遇到错误:
if (primaryNode == true)
{
this->mSecondaryToPrimaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
else
{
this->mPrimaryToSecondaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
我尝试将 PrimaryNode 设置为 true 或 false。我也遇到同样的错误。
代码:
ipc.hpp
#pragma once
#include <thread>
#include <string>
#include <atomic>
#include <memory>
#include <variant>
#include <optional>
#include <iostream>
#include <functional>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
/// @brief
/// @tparam T1 Specifies the data-type that has to be sent
/// @tparam T2 Specifies the data-type that has will be received
/// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
template<typename T1, typename T2, bool primaryNode>
class Ipc
{
private:
static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
using callback_t = std::function<void(void)>;
callback_t mCallback;
std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;
std::string mPrimaryToSecondaryMessageQueueName;
std::string mSecondaryToPrimaryMessageQueueName;
std::thread mReceiveThread;
std::atomic_bool mExitReceiveThread{ false };
boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;
void listen(void);
public:
Ipc() {}
bool open(const std::string& queueName);
bool close(void);
bool send(const T1& data, std::uint32_t priority = 10);
std::optional<T2> receive(void);
bool register_callback(callback_t callback_implementation);
bool isDataAvailableInReceiveDataQueue(void) const;
};
template<typename T1, typename T2, bool primaryNode>
inline void Ipc<T1, T2, primaryNode>::listen(void)
{
T2 receiveData;//Buffer to store received data
std::uint64_t receiveLength;
std::uint32_t priority;
while(this->mExitReceiveThread.load() == false)
{
try
{
std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
receiveLength = 0; //Initialize read length to 0
priority = 0; //Initialize priority to 0
if (primaryNode == true)
{
this->mSecondaryToPrimaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
else
{
this->mPrimaryToSecondaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
this->mReceiveDataQueue.push(receiveData);
this->mCallback();
}
catch (const std::exception& ex)
{
std::cout << "Inside Listen Exception\n";
std::cout << ex.what() << std::endl;
}
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
{
try
{
if(primaryNode == true)
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
else
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
//Open-Create message queue to send data from primaryNode node to secondary node
this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::open_or_create,
this->mPrimaryToSecondaryMessageQueueName.c_str(),
MAX_MESSAGE_DEPTH,
sizeof(T1)
);
//Open-Create message queue to send data from secondary node to primaryNode node
this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::open_or_create,
this->mSecondaryToPrimaryMessageQueueName.c_str(),
MAX_MESSAGE_DEPTH,
sizeof(T2)
);
//Start Listner Thread
this->mReceiveThread = std::thread(&Ipc::listen, this);
return true;
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::close(void)
{
try
{
this->mExitReceiveThread.store(true); //Marked to close thread
boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
{
try
{
if (primaryNode == true) //Send message on Primary to Secondary Queue
{
this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
}
else //Send message on Secondary to Primary Queue
{
this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
}
return true;
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
{
std::optional<T2> data{ std::nullopt };
if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
{
data = this->mReceiveDataQueue.front();
this->mReceiveDataQueue.pop();
}
else
{
//data = std::nullopt; //Not needed
}
return data;
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
{
try
{
this->mCallback = callbackImplementation;
return true;
}
catch (const std::exception& ex)
{
std::cerr << ex.what() << '\n';
}
return false;
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
{
if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
{
return true;
}
else
{
return false;
}
}
main.cpp
#include <ipc.hpp>
#include <iostream>
//P1 stands for Process 1
//P2 stands for Process 2
struct P1ToP2
{
float a;
int b;
};
struct P2ToP1
{
int a;
int b;
};
Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object
void message_queue_data_received(void)
{
if (ipc1.isDataAvailableInReceiveDataQueue() == true)
{
auto s = ipc1.receive();
if (s.has_value() == true)
{
std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
}
}
}
int main(int argc, char *argv[])
{
bool dataReceivedOnEthernet = false;
ipc1.register_callback(message_queue_data_received);
this->ipc1.open("ipc1");
while(true)
{
if(dataReceivedOnEthernet == true) //Flag set by other thread
{
P1ToP2 p;
p.a = 10.23; //Some Data received over ethernet
p.b = 10; //Some Data received over ethernet
ipc1.send(p); //Send data over IPC
}
//Other Code
}
}
错误
boost::interprocess_exception::library_error
为什么进程对消息使用不同的类型,同时默默地假设它们具有相同的大小(以及简单和标准布局等......)。您是否混淆了各种类型和队列?看起来是这样的。
如果你能很好地命名事物,这会很有帮助。另外,删除重复项。
我会按消息类型分隔队列。按角色命名:
// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;
然后,通过定义一个 Channel 类型:
using Prio = uint32_t;
template <typename T> struct Channel {
Channel(std::string name);
~Channel();
std::tuple<T, Prio> receive(std::stop_token token);
bool send(T const& msg, Prio prio, std::stop_token token);
private:
using Queue = boost::interprocess::message_queue;
std::string name_;
Queue queue_ = open();
Queue open() {
if constexpr (IsClient) {
return {boost::interprocess::open_only, name_.c_str()};
} else {
return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
}
}
};
现在我们可以简单地说:
Channel<Request> reqQ;
Channel<Response> resQ;
具有类似的构造
Ipc(std::string const& queueName) try
: reqQ(queueName + "_req")
, resQ(queueName + "_res")
, mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
std::cerr << "Ipc: " << ex.what() << std::endl;
throw;
}
监听器将收到的消息排队。类型取决于客户端/服务器模式:
using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
注意我如何选择更安全的
jthread
并带有停止标记来协调线程退出:
std::jthread mThread;
std::stop_token mToken = mThread.get_stop_token();
void listen(std::stop_token token) {
while (!token.stop_requested()) {
try {
if constexpr (IsClient)
mInbox.push(get<0>(resQ.receive(token)));
else
mInbox.push(get<0>(reqQ.receive(token)));
if (mCallback)
mCallback();
} catch (std::exception const& ex) {
std::cerr << "Listen: " << ex.what() << std::endl;
}
}
}
外部操作看起来简单多了,比如:
void wait() { mThread.join(); }
void close() {
mThread.request_stop();
if (std::this_thread::get_id() != mThread.get_id())
wait();
}
bool send(Request const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
std::optional<Incoming> consume() {
if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
return val;
return {};
}
void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }
让我们定义上面的服务器,通过发回a和b/2的平方根来响应
Requests
:
void server() {
ServerIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Request> req = ipc.consume()) {
auto [a, b] = *req;
std::cout << "server received request a:" << a << " b:" << b << std::endl;
if (a == -42 && b == -42) {
std::cout << " -> server handling close request" << std::endl;
ipc.close();
} else {
// send response
ipc.send(Response{sqrt(a), b / 2});
}
}
};
ipc.register_callback(handler);
ipc.wait();
}
仅此而已。请注意我们如何添加一种机制来告诉服务器客户端希望它退出(因为服务器拥有资源)。客户端可能看起来像这样:
void client() {
ClientIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Response> res = ipc.consume()) {
auto [a, b] = *res;
std::cout << "client received response a:" << a << " b:" << b << std::endl;
}
};
ipc.register_callback(handler);
for (int i = 0; i < 100; ++i) {
if (rand() % 30 == 0) // Flag set by other thread
ipc.send(Request{i, 2 * i}); // Send request
std::this_thread::sleep_for(10ms);
}
std::cout << "Client sending close command" << std::endl;
ipc.send(Request{-42, -42});
std::cout << "Closing" << std::endl;
ipc.close();
}
它所做的只是发送一些约 10 秒的请求并记录响应。然后它告诉服务器退出并关闭。只有服务器会删除队列。
切换客户端/服务器的简单主线:
int main(int argc, char** argv) {
if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
server();
else
client();
std::cout << "Bye" << std::endl;
}
文件
test.h
#pragma once
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <iostream>
#include <optional>
#include <thread>
using namespace std::chrono_literals;
template <typename Request, typename Response, bool IsClient> class Ipc {
private:
static constexpr uint8_t MAX_DEPTH = 5;
using callback_t = std::function<void()>;
callback_t mCallback;
using Prio = uint32_t;
template <typename T> struct Channel {
Channel(std::string name) : name_(std::move(name)) { //
assert(queue_.get_max_msg_size() == sizeof(T));
}
~Channel() {
if (!IsClient) {
std::cerr << "Server cleaning up " << name_ << std::endl;
Queue::remove(name_.c_str());
}
}
std::tuple<T, Prio> receive(std::stop_token token) {
size_t len = 0;
Prio prio = 0;
T msg{};
while (!token.stop_requested()) {
auto deadline = std::chrono::steady_clock::now() + 50ms;
if (queue_.timed_receive(&msg, sizeof(msg), len, prio, deadline)) {
assert(len == sizeof(T));
return {std::move(msg), prio};
}
}
throw std::runtime_error("stop requested");
}
bool send(T const& msg, Prio prio, std::stop_token token) {
while (!token.stop_requested()) {
auto deadline = std::chrono::steady_clock::now() + 50ms;
if (queue_.timed_send(&msg, sizeof(msg), prio, deadline))
return true;
}
return false;
}
private:
using Queue = boost::interprocess::message_queue;
std::string name_;
Queue queue_ = open();
Queue open() {
if constexpr (IsClient) {
return {boost::interprocess::open_only, name_.c_str()};
} else {
return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
}
}
};
Channel<Request> reqQ;
Channel<Response> resQ;
std::jthread mThread;
std::stop_token mToken = mThread.get_stop_token();
using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
void listen(std::stop_token token) {
while (!token.stop_requested()) {
try {
if constexpr (IsClient)
mInbox.push(get<0>(resQ.receive(token)));
else
mInbox.push(get<0>(reqQ.receive(token)));
if (mCallback)
mCallback();
} catch (std::exception const& ex) {
std::cerr << "Listen: " << ex.what() << std::endl;
}
}
}
public:
Ipc(std::string const& queueName) try
: reqQ(queueName + "_req")
, resQ(queueName + "_res")
, mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
std::cerr << "Ipc: " << ex.what() << std::endl;
throw;
}
void wait() { mThread.join(); }
void close() {
mThread.request_stop();
if (std::this_thread::get_id() != mThread.get_id())
wait();
}
bool send(Request const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
std::optional<Incoming> consume() {
if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
return val;
return {};
}
void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }
};
文件
test.cpp
#include "test.h"
#include <cmath>
#include <set>
#include <boost/asio.hpp>
static boost::asio::signal_set signals(boost::asio::system_executor{}, SIGTERM, SIGINT);
// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;
static std::string IPC_NAME = "so_demo_ipc";
void server() {
ServerIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Request> req = ipc.consume()) {
auto [a, b] = *req;
std::cout << "server received request a:" << a << " b:" << b << std::endl;
if (a == -42 && b == -42) {
std::cout << " -> server handling close request" << std::endl;
ipc.close();
} else {
// send response
ipc.send(Response{sqrt(a), b / 2});
}
}
};
ipc.register_callback(handler);
ipc.wait();
}
void client() {
ClientIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Response> res = ipc.consume()) {
auto [a, b] = *res;
std::cout << "client received response a:" << a << " b:" << b << std::endl;
}
};
ipc.register_callback(handler);
for (int i = 0; i < 100; ++i) {
if (rand() % 30 == 0) // Flag set by other thread
ipc.send(Request{i, 2 * i}); // Send request
std::this_thread::sleep_for(10ms);
}
std::cout << "Client sending close command" << std::endl;
ipc.send(Request{-42, -42});
std::cout << "Closing" << std::endl;
ipc.close();
}
int main(int argc, char** argv) {
if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
server();
else
client();
std::cout << "Bye" << std::endl;
}
本地现场演示:
¹ 遗憾的是,在线编译器不允许共享内存访问