我使用 mmap 映射文件以在进程之间共享数据,然后使用另一个共享内存在数据准备好时通知进程。在共享内存中创建一个信号量。
当数据写入所有文件后,我立即使用信号量来通知数据已准备好。
是否有可能某些文件中的数据在reader进程收到通知后无法获取?
下面是一个尝试模仿这种情况的示例代码,似乎一旦收到通知,数据就始终可供读者使用。但我不确定这是否只是因为我运行的时间不够长。 (我的一些同事声称,当他们在示例代码中使用类似模式时,他们偶尔会遇到数据未更新的问题)
有没有什么理论或者官方文档可以证明下面的代码没问题?
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <optional>
#include <fstream>
#include <ios>
#include <thread>
// data to notify if update is done
template <class T> struct SharedData {
SharedData(const T &init_value) : data_(init_value) {}
// Mutex to protect access to the data
boost::interprocess::interprocess_mutex mutex_;
// Condition to wait before the data is set
boost::interprocess::interprocess_condition cond_;
T data_;
};
template <class T> class SharedDataWriter {
public:
SharedDataWriter(const std::string& name, const T &init_value)
: shm_(boost::interprocess::open_or_create, name.data(), boost::interprocess::read_write)
, name_(name) {
shm_.truncate(sizeof(SharedData<T>));
region_.emplace(shm_, boost::interprocess::read_write);
void *addr = region_->get_address();
shared_data_ = new (addr) SharedData<T>(init_value);
}
~SharedDataWriter() {
boost::interprocess::shared_memory_object::remove(name_.data());
}
void set_value(const T& value) {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(shared_data_->mutex_);
shared_data_->data_ = value;
shared_data_->cond_.notify_all();
}
private:
boost::interprocess::shared_memory_object shm_;
std::optional<boost::interprocess::mapped_region> region_;
std::string name_;
SharedData<T>* shared_data_ = nullptr;
};
template <class T> class SharedDataReader {
public:
SharedDataReader(const std::string& name)
: shm_(boost::interprocess::open_only, name.data(), boost::interprocess::read_write)
, region_(shm_, boost::interprocess::read_write) {
void * addr = region_.get_address();
shared_data_ = (SharedData<T> *)addr;
}
T wait_value() {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(
shared_data_->mutex_);
shared_data_->cond_.wait(lock);
return shared_data_->data_;
}
private:
boost::interprocess::shared_memory_object shm_;
boost::interprocess::mapped_region region_;
SharedData<T> *shared_data_ = nullptr;
};
// file content is basically a table
struct FixedHeader {
int32_t col_size_;
int32_t row_size_;
void * get_value_start_addr(char * begin) {
return begin + sizeof(FixedHeader);
}
};
struct SingleMappedFile {
SingleMappedFile(const std::string& filename)
: mapped_file_(filename.data(), boost::interprocess::read_write)
, region_(mapped_file_, boost::interprocess::read_write) {}
FixedHeader& get_header() {
return *((FixedHeader*)get_start_addr());
}
void write(int32_t row_index, int32_t value) {
int32_t * value_start = (int32_t *)get_value_start_addr();
for (int32_t i = 0; i < get_header().col_size_; ++i) {
value_start[row_index * get_header().col_size_ + i] = value;
}
}
int32_t* get_row(int32_t row_index) {
int32_t * value_start = (int32_t *)get_value_start_addr();
return value_start + row_index * get_header().col_size_;
}
void * get_start_addr() const { return region_.get_address(); }
const char * get_file_name() const { return mapped_file_.get_name(); }
private:
char * get_value_start_addr() {
return (char*)get_header().get_value_start_addr((char*)get_start_addr());
}
boost::interprocess::file_mapping mapped_file_;
boost::interprocess::mapped_region region_;
};
void writer_main() {
// create 50 files and set init value
for (size_t i = 0; i < 50; ++i) {
std::ofstream file(std::string("file_") + std::to_string(i), std::ios_base::out | std::ios_base::binary);
int32_t col = 5000;
int32_t row = 300;
file.write((char*)&col, sizeof(col));
file.write((char*)&row, sizeof(row));
for (int32_t k = 0; k < col * row; ++k) file.write((char*)&k, sizeof(k));
}
// mmap all these 50 files
std::vector<SingleMappedFile> mapped_files;
for (size_t i = 0; i < 50; ++i) {
mapped_files.emplace_back(std::string("file_") + std::to_string(i));
}
SharedDataWriter<int32_t> writer("notify", -1);
int32_t value = 1;
int32_t row_index = 0;
while(true) {
char c;
std::cout << "input anything to write data." << std::endl;
std::cin >> c;
// write data into mapped files and notify
for (auto& mfile : mapped_files) {
mfile.write(row_index, value);
}
writer.set_value(row_index);
row_index++;
}
}
void thread_func(SharedDataReader<int32_t>& reader, SingleMappedFile& mfile) {
// wait for notification and check if data is updated
while (true) {
int32_t row_index = reader.wait_value();
int32_t* row = mfile.get_row(row_index);
for (int32_t i = mfile.get_header().col_size_; i > 0; --i) {
if (row[i - 1] != 1) {
throw std::runtime_error("data not updated.");
}
}
std::cout << "row_index = " << row_index << std::endl;
}
}
void reader_main() {
std::vector<SingleMappedFile> mapped_files;
for (size_t i = 0; i < 50; ++i) {
mapped_files.emplace_back(std::string("file_") + std::to_string(i));
}
SharedDataReader<int32_t> reader("notify");
std::vector<std::thread> threads;
for (auto& mfile : mapped_files) {
threads.emplace_back([&reader, &mfile]() { thread_func(reader, mfile); });
}
for (auto& t : threads) t.join();
}
int main(int argc, char **) {
if (argc > 1) {
reader_main();
} else {
writer_main();
}
}