我想知道是否有人以前使用 POSIX 库实现了用于线程间通信的邮箱类。作为参考,我看起来类似于 SystemVerilog 中使用的邮箱:http://www.asic-world.com/systemverilog/sema_mail_events2.html
编辑:
我尝试使用 STL 队列、pthread 条件和互斥体来创建邮箱。它尝试复制链接中描述的 SystemVerilog 邮箱的行为:
#include <cerrno>
#include <climits>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <queue>
#include <fcntl.h>
#include <pthread.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
using namespace std;
class Mailbox{
private:
pthread_cond_t msg_available; // Message in the mailbox?
pthread_mutex_t queue_mutex; // Mutex for queue control
queue<messageT> messages; // Messages
public:
// Constructor
Mailbox(void){
msg_available = PTHREAD_COND_INITIALIZER;
queue_mutex = PTHREAD_MUTEX_INITIALIZER;
}
// Destructor
~Mailbox(void){
// Nothing to do here
}
// Put a single message into the mailbox
void put(messageT msg){
// Lock down queue
if(pthread_mutex_lock(queue_mutex)){
fprintf(stderr, "Queue Mutex Lock Error\n");
exit(EXIT_FAILURE);
}
// Push message into mailbox
messages.push(msg);
// Signal there is a message in the mailbox
if(pthread_cond_signal(&msg_available)){
fprintf(stderr, "cond error");
exit(EXIT_FAILURE);
}
// Unlock queue
if(pthread_mutex_unlock(queue_mutex)){
fprintf(stderr, "Queue Mutex Unlock Error\n");
exit(EXIT_FAILURE);
}
}
// Try to put a single message into the mailbox
int try_put(messageT msg){
// Try to lock down queue
if(pthread_mutex_trylock(queue_mutex) == 0){
// Push message into mailbox
messages.push(msg);
// Signal there is a message in the mailbox
if(pthread_cond_signal(&msg_available)){
fprintf(stderr, "cond error");
exit(EXIT_FAILURE);
}
// Unlock queue
if(pthread_mutex_unlock(queue_mutex)){
fprintf(stderr, "Queue Mutex Unlock Error\n");
exit(EXIT_FAILURE);
}
return 1;
}
// Otherwise, say mailbox is unavailable
else
return 0;
}
// Get single message from a mailbox
void get(mesageT *msg){
// Lock down queue
if(pthread_mutex_lock(queue_mutex)){
fprintf(stderr, "Queue Mutex Lock Error\n");
exit(EXIT_FAILURE);
}
// Wait for a message to come in
while(messages.empty()){
// Release hold of the lock until another thread
// signals that a message has been placed
if(pthread_cond_wait(&msg_available,&queue_mutex)){
fprintf(stderr, "cond_wait error");
exit(EXIT_FAILURE);
}
}
// Pop of least recent message
*msg = messages.front();
messages.pop();
// Unlock queue
if(pthread_mutex_unlock(queue_mutex)){
fprintf(stderr, "Queue Mutex Unlock Error\n");
exit(EXIT_FAILURE);
}
}
// Try to get single message from a mailbox
int try_get(mesageT *msg){
int mailbox_ready = 1; // Mailbox ready
// Lock down queue
if(pthread_mutex_lock(queue_mutex)){
fprintf(stderr, "Queue Mutex Lock Error\n");
exit(EXIT_FAILURE);
}
// Indicate if mailbox is empty
if(messages.empty())
mailbox_ready = 0
// Otherwise, grab the message
else {
// Pop of least recent message
*msg = messages.front();
messages.pop();
}
// Unlock queue
if(pthread_mutex_unlock(queue_mutex)){
fprintf(stderr, "Queue Mutex Unlock Error\n");
exit(EXIT_FAILURE);
}
return mailbox_ready;
}
// Peek at single message from a mailbox
void peek(mesageT *msg){
// Lock down queue
if(pthread_mutex_lock(queue_mutex)){
fprintf(stderr, "Queue Mutex Lock Error\n");
exit(EXIT_FAILURE);
}
// Wait for a message to come in
while(messages.empty()){
// Release hold of the lock until another thread
// signals that a message has been placed
if(pthread_cond_wait(&msg_available,&queue_mutex)){
fprintf(stderr, "cond_wait error");
exit(EXIT_FAILURE);
}
}
// Peek at most recent message
*msg = messages.front();
// Unlock queue
if(pthread_mutex_unlock(queue_mutex)){
fprintf(stderr, "Queue Mutex Unlock Error\n");
exit(EXIT_FAILURE);
}
}
// Try to peek at single message from a mailbox
int try_peek(mesageT *msg){
int mailbox_ready = 1; // Mailbox ready
// Lock down queue
if(pthread_mutex_lock(queue_mutex)){
fprintf(stderr, "Queue Mutex Lock Error\n");
exit(EXIT_FAILURE);
}
if(messages.empty()) // Indicate if mailbox is empty
mailbox_ready = 0
else // Otherwise, grab the message
*msg = messages.front();
// Unlock queue
if(pthread_mutex_unlock(queue_mutex)){
fprintf(stderr, "Queue Mutex Unlock Error\n");
exit(EXIT_FAILURE);
}
return mailbox_ready;
}
}
一个简单的、信号量保护的队列就足够了。
如果您希望能够将不同类型的数据放入“邮箱”中,则使用一个通用的基本结构,该结构可以轻松扩展并包含一个整数来说明它是什么类型的结构,然后将其类型转换为当您需要使用它时,正确的结构(取决于嵌入类型)。
如果线程都在同一个进程中,您应该使用
pthread_mutex
和 pthread_condition_variable
,而不是 semaphore
。 Unix 信号量允许进程间同步,但它们在进程内效率较低,而且语义比互斥锁和条件变量更难推理。这里有一些在 C 或 C++ 中使用互斥体和条件变量的实现:
邮箱.h
#ifndef MAILBOX_H
#define MAILBOX_H
#include <map>
class Mailbox
{
public:
static Mailbox& GetInstance()
{
static Mailbox instance;
return instance;
}
void sendMail(int recipient, std::string message);
std::string getMail(int recipient);
bool checkMail(int recipient);
private:
Mailbox();
~Mailbox();
Mailbox(const Mailbox&) = delete;
Mailbox& operator=(const Mailbox&) = delete;
std::map<int, std::string> mailbox;
};
#endif
邮箱.cpp
#include "Mailbox.h"
Mailbox::Mailbox() = default;
Mailbox::~Mailbox() = default;
void Mailbox::sendMail(int recipient, std::string message)
{
mailbox.insert(std::pair<int, std::string>(recipient, message));
}
std::string Mailbox::getMail(int recipient)
{
std::string message = mailbox[recipient];
mailbox.erase(recipient);
return message;
}
bool Mailbox::checkMail(int recipient)
{
std::map<int, std::string>::iterator it;
it = mailbox.find(recipient);
if (it != mailbox.end())
{
return true;
}
return false;
}