用于线程间通信的邮箱的C++实现

问题描述 投票:0回答:3

我想知道是否有人以前使用 POSIX 库实现了用于线程间通信的邮箱类。作为参考,我看起来类似于 SystemVerilog 中使用的邮箱:http://www.asic-world.com/systemverilog/sema_mail_events2.html

编辑

我尝试使用 STL 队列、pthread 条件和互斥体来创建邮箱。它尝试复制链接中描述的 SystemVerilog 邮箱的行为:

#include <cerrno>
#include <climits>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <queue>

#include <fcntl.h>
#include <pthread.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

using namespace std;

class Mailbox{

    private:
        pthread_cond_t  msg_available;  // Message in the mailbox?
        pthread_mutex_t queue_mutex;    // Mutex for queue control

        queue<messageT> messages;       // Messages

    public:
        // Constructor
        Mailbox(void){
            msg_available = PTHREAD_COND_INITIALIZER;
            queue_mutex = PTHREAD_MUTEX_INITIALIZER;
        }
        // Destructor
        ~Mailbox(void){
            // Nothing to do here
        }

        // Put a single message into the mailbox
        void put(messageT msg){

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            // Push message into mailbox
            messages.push(msg);

            // Signal there is a message in the mailbox
            if(pthread_cond_signal(&msg_available)){                    
                fprintf(stderr, "cond error");
                exit(EXIT_FAILURE);
            }

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }
        }

        // Try to put a single message into the mailbox
        int try_put(messageT msg){

            // Try to lock down queue
            if(pthread_mutex_trylock(queue_mutex) == 0){

                // Push message into mailbox
                messages.push(msg);

                // Signal there is a message in the mailbox
                if(pthread_cond_signal(&msg_available)){                    
                    fprintf(stderr, "cond error");
                    exit(EXIT_FAILURE);
                }

                // Unlock queue
                if(pthread_mutex_unlock(queue_mutex)){
                    fprintf(stderr, "Queue Mutex Unlock Error\n");
                    exit(EXIT_FAILURE);
                }

                return 1;
            }
            // Otherwise, say mailbox is unavailable
            else
                return 0;
        }

        //  Get single message from a mailbox
        void get(mesageT *msg){

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            // Wait for a message to come in
            while(messages.empty()){
                // Release hold of the lock until another thread
                // signals that a message has been placed
                if(pthread_cond_wait(&msg_available,&queue_mutex)){                 
                    fprintf(stderr, "cond_wait error");
                    exit(EXIT_FAILURE);
                }
            }

            // Pop of least recent message
            *msg = messages.front();
            messages.pop();

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }

        }

        //  Try to get single message from a mailbox
        int try_get(mesageT *msg){

            int mailbox_ready = 1;  // Mailbox ready

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            // Indicate if mailbox is empty
            if(messages.empty())
                mailbox_ready = 0
            // Otherwise, grab the message
            else {
                // Pop of least recent message
                *msg = messages.front();
                messages.pop();
            }

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }

            return mailbox_ready;
        }

        //  Peek at single message from a mailbox
        void peek(mesageT *msg){

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            // Wait for a message to come in
            while(messages.empty()){
                // Release hold of the lock until another thread
                // signals that a message has been placed
                if(pthread_cond_wait(&msg_available,&queue_mutex)){                 
                    fprintf(stderr, "cond_wait error");
                    exit(EXIT_FAILURE);
                }
            }

            // Peek at most recent message
            *msg = messages.front();

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }

        }

        //  Try to peek at single message from a mailbox
        int try_peek(mesageT *msg){

            int mailbox_ready = 1;  // Mailbox ready

            // Lock down queue
            if(pthread_mutex_lock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Lock Error\n");
                exit(EXIT_FAILURE);
            }

            if(messages.empty())    // Indicate if mailbox is empty
                mailbox_ready = 0
            else                    // Otherwise, grab the message
                *msg = messages.front();

            // Unlock queue
            if(pthread_mutex_unlock(queue_mutex)){
                fprintf(stderr, "Queue Mutex Unlock Error\n");
                exit(EXIT_FAILURE);
            }

            return mailbox_ready;
        }
}
c++ multithreading unix parallel-processing pthreads
3个回答
3
投票

一个简单的、信号量保护的队列就足够了。

如果您希望能够将不同类型的数据放入“邮箱”中,则使用一个通用的基本结构,该结构可以轻松扩展并包含一个整数来说明它是什么类型的结构,然后将其类型转换为当您需要使用它时,正确的结构(取决于嵌入类型)。


2
投票

如果线程都在同一个进程中,您应该使用

pthread_mutex
pthread_condition_variable
,而不是 semaphore
。 Unix 信号量允许进程间同步,但它们在进程内效率较低,而且语义比互斥锁和条件变量更难推理。

这里有一些在 C 或 C++ 中使用互斥体和条件变量的实现:

  • 这是在 C++ 中实现有界缓冲区的正确方法吗
  • http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
  • C++11线程安全队列

0
投票
我开发了一个在我喜欢的信号量中使用哈希表的解决方案。希望您觉得它有用。

邮箱.h

#ifndef MAILBOX_H #define MAILBOX_H #include <map> class Mailbox { public: static Mailbox& GetInstance() { static Mailbox instance; return instance; } void sendMail(int recipient, std::string message); std::string getMail(int recipient); bool checkMail(int recipient); private: Mailbox(); ~Mailbox(); Mailbox(const Mailbox&) = delete; Mailbox& operator=(const Mailbox&) = delete; std::map<int, std::string> mailbox; }; #endif
邮箱.cpp

#include "Mailbox.h" Mailbox::Mailbox() = default; Mailbox::~Mailbox() = default; void Mailbox::sendMail(int recipient, std::string message) { mailbox.insert(std::pair<int, std::string>(recipient, message)); } std::string Mailbox::getMail(int recipient) { std::string message = mailbox[recipient]; mailbox.erase(recipient); return message; } bool Mailbox::checkMail(int recipient) { std::map<int, std::string>::iterator it; it = mailbox.find(recipient); if (it != mailbox.end()) { return true; } return false; }
    
© www.soinside.com 2019 - 2024. All rights reserved.