如何保证不同块共享内存之间的数据可用性

问题描述 投票:0回答:0

我使用 mmap 映射文件以在进程之间共享数据,然后使用另一个共享内存在数据准备好时通知进程。在共享内存中创建一个信号量。

当数据写入所有文件后,我立即使用信号量来通知数据已准备好。

是否有可能某些文件中的数据在reader进程收到通知后无法获取?

下面是一个尝试模仿这种情况的示例代码,似乎一旦收到通知,数据就始终可供读者使用。但我不确定这是否只是因为我运行的时间不够长。 (我的一些同事声称,当他们在示例代码中使用类似模式时,他们偶尔会遇到数据未更新的问题)

有没有什么理论或者官方文档可以证明下面的代码没问题?

#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <optional>
#include <fstream>
#include <ios>
#include <thread>
// data to notify if update is done
template <class T> struct SharedData {
    SharedData(const T &init_value) : data_(init_value) {}
    // Mutex to protect access to the data
    boost::interprocess::interprocess_mutex mutex_;
    // Condition to wait before the data is set
    boost::interprocess::interprocess_condition cond_;
    T data_;
};

template <class T> class SharedDataWriter {
public:
    SharedDataWriter(const std::string& name, const T &init_value)
    : shm_(boost::interprocess::open_or_create, name.data(), boost::interprocess::read_write)
    , name_(name) {
        shm_.truncate(sizeof(SharedData<T>));
        region_.emplace(shm_, boost::interprocess::read_write);
        void *addr = region_->get_address();
        shared_data_ = new (addr) SharedData<T>(init_value);
    }
    ~SharedDataWriter() {
        boost::interprocess::shared_memory_object::remove(name_.data());
    }

    void set_value(const T& value) {
        boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(shared_data_->mutex_);
        shared_data_->data_ = value;
        shared_data_->cond_.notify_all();
    }
private:
    boost::interprocess::shared_memory_object shm_;
    std::optional<boost::interprocess::mapped_region> region_;
    std::string name_;
    SharedData<T>* shared_data_ = nullptr;
};

template <class T> class SharedDataReader {
public:
    SharedDataReader(const std::string& name)
    : shm_(boost::interprocess::open_only, name.data(), boost::interprocess::read_write)
    , region_(shm_, boost::interprocess::read_write) {
        void * addr = region_.get_address();
        shared_data_ = (SharedData<T> *)addr;
    }

    T wait_value() {
        boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(
            shared_data_->mutex_);
        shared_data_->cond_.wait(lock);
        return shared_data_->data_;
    }
private:
    boost::interprocess::shared_memory_object shm_;
    boost::interprocess::mapped_region region_;
    SharedData<T> *shared_data_ = nullptr;
};
// file content is basically a table
struct FixedHeader {
    int32_t col_size_;
    int32_t row_size_;

    void * get_value_start_addr(char * begin) {
        return begin + sizeof(FixedHeader);
    }
};

struct SingleMappedFile {
    SingleMappedFile(const std::string& filename)
    : mapped_file_(filename.data(), boost::interprocess::read_write)
    , region_(mapped_file_, boost::interprocess::read_write) {}
    FixedHeader& get_header() {
        return *((FixedHeader*)get_start_addr());
    }
    void write(int32_t row_index, int32_t value) {
        int32_t * value_start = (int32_t *)get_value_start_addr();
        for (int32_t i = 0; i < get_header().col_size_; ++i) {
            value_start[row_index * get_header().col_size_ + i] = value;
        }
    }
    int32_t* get_row(int32_t row_index) {
        int32_t * value_start = (int32_t *)get_value_start_addr();
        return value_start + row_index * get_header().col_size_;
    }
    void * get_start_addr() const { return region_.get_address(); }
    const char * get_file_name() const { return mapped_file_.get_name(); }
private:
    char * get_value_start_addr() {
        return (char*)get_header().get_value_start_addr((char*)get_start_addr());
    }
    boost::interprocess::file_mapping mapped_file_;
    boost::interprocess::mapped_region region_;
};

void writer_main() {
    // create 50 files and set init value
    for (size_t i = 0; i < 50; ++i) {
        std::ofstream file(std::string("file_") + std::to_string(i), std::ios_base::out | std::ios_base::binary);
        int32_t col = 5000;
        int32_t row = 300;
        file.write((char*)&col, sizeof(col));
        file.write((char*)&row, sizeof(row));
        for (int32_t k = 0; k < col * row; ++k) file.write((char*)&k, sizeof(k));
    }

    // mmap all these 50 files
    std::vector<SingleMappedFile> mapped_files;
    for (size_t i = 0; i < 50; ++i) {
        mapped_files.emplace_back(std::string("file_") + std::to_string(i));
    }

    SharedDataWriter<int32_t> writer("notify", -1);

    int32_t value = 1;
    int32_t row_index = 0;
    while(true) {
        char c;
        std::cout << "input anything to write data." << std::endl;
        std::cin >> c;
        // write data into mapped files and notify
        for (auto& mfile : mapped_files) {
            mfile.write(row_index, value);
        }
        writer.set_value(row_index);
        row_index++;
    }
}

void thread_func(SharedDataReader<int32_t>& reader, SingleMappedFile& mfile) {
    // wait for notification and check if data is updated
    while (true) {
        int32_t row_index = reader.wait_value();
        int32_t* row = mfile.get_row(row_index);
        for (int32_t i = mfile.get_header().col_size_; i > 0; --i) {
            if (row[i - 1] != 1) {
                throw std::runtime_error("data not updated.");
            }
        }
        std::cout << "row_index = " << row_index << std::endl;
    }
}

void reader_main() {
    std::vector<SingleMappedFile> mapped_files;
    for (size_t i = 0; i < 50; ++i) {
        mapped_files.emplace_back(std::string("file_") + std::to_string(i));
    }
    SharedDataReader<int32_t> reader("notify");
    std::vector<std::thread> threads;
    for (auto& mfile : mapped_files) {
        threads.emplace_back([&reader, &mfile]() { thread_func(reader, mfile); });
    }
    for (auto& t : threads) t.join();
}

int main(int argc, char **) {
    if (argc > 1) {
        reader_main();
    } else {
        writer_main();
    }
}
c++ linux ipc shared-memory
© www.soinside.com 2019 - 2024. All rights reserved.