在 Rust 中,在没有多线程同步的情况下写入内存映射文件的正确方法是什么?
我需要使用多个线程创建一个 40+ GB 的文件。该文件用作 u64 值的巨大向量。线程不需要任何类型的同步——每个线程的输出对于该线程来说都是唯一的,但每个线程不会获得自己的切片。相反,数据的性质确保每个线程将在文件中生成一组要写入的唯一位置。简单的例子——每个线程写入一个位置
[ind / thread_count]
,其中 ind
达到数百万。对于 thread_count = 2
,一个线程写入奇数位置,另一个线程写入偶数位置。
我使用了 memmap2 - memmap 库的一个新维护的分支。
memmap2
似乎可以完成我访问所需的一切,但我不知道如何从多个线程正确使用它。
如果你可以保证你的线程永远不会写入相同的偏移量,你可以将文件映射到内存并使用
std::ptr::write
写入它。
示例:
use std::fmt::Write;
use std::fs::File;
use memmap2::{MmapMut, MmapOptions};
fn main() {
const CHUNK_SIZE: usize = 32;
const TOTAL_SIZE: usize = 40 * usize::pow(2, 30);
let file_name = {
let mut p = std::env::temp_dir();
p.push("many_gib_file.tmp");
p
};
println!("Creating file {}", file_name.as_os_str().to_string_lossy());
let file: File = File::options()
.read(true)
.write(true)
.create(true)
.open(&file_name)
.expect("Failed to create file");
file.set_len(TOTAL_SIZE.try_into().unwrap())
.expect("Failed to resize file");
let mut mmap: MmapMut = unsafe {
MmapOptions::new()
.offset(0)
.len(TOTAL_SIZE)
.map_mut(&file)
.expect("Failed to map file")
};
let start_ptr = mmap.as_mut_ptr();
#[derive(Copy, Clone)]
struct ShareablePtr(*mut u8);
// SAFETY: We never alias data when writing from multiple threads.
// Writer threads finish before unmapping.
unsafe impl Send for ShareablePtr {}
let start_ptr = ShareablePtr(start_ptr);
std::thread::scope(|s| {
// Even thread
s.spawn(move || {
let start_ptr = start_ptr;
let start_ptr: *mut u8 = start_ptr.0;
let mut value = String::with_capacity(CHUNK_SIZE);
let mut offset = 0;
while offset + CHUNK_SIZE <= TOTAL_SIZE {
value.clear();
write!(&mut value, "Even:{:26}\n", offset).unwrap();
assert_eq!(value.len(), CHUNK_SIZE);
unsafe {
// SAFETY: We never overlap on writes.
// Pointer is living because we using scoped threads.
std::ptr::copy_nonoverlapping(
value.as_ptr(),
start_ptr.add(offset),
CHUNK_SIZE,
);
}
// Add offset for ourselves and for odd thread.
offset += 2 * CHUNK_SIZE;
}
// Make writes visible for main thread
// It is not necessary when using `std::thread::scope` but may be necessary in your case.
std::sync::atomic::fence(std::sync::atomic::Ordering::Release);
});
// Odd thread
s.spawn(move || {
let start_ptr = start_ptr;
let start_ptr: *mut u8 = start_ptr.0;
let mut value = String::with_capacity(CHUNK_SIZE);
// We want it to not overlap with even thread.
let mut offset = CHUNK_SIZE;
while offset + CHUNK_SIZE <= TOTAL_SIZE {
value.clear();
write!(&mut value, "Odd:{:27}\n", offset).unwrap();
assert_eq!(value.len(), CHUNK_SIZE);
unsafe {
// SAFETY: We never overlap on writes.
// Pointer is living because we using scoped threads.
std::ptr::copy_nonoverlapping(
value.as_ptr(),
start_ptr.add(offset),
CHUNK_SIZE,
);
}
// Add offset for ourselves and for even thread.
offset += 2 * CHUNK_SIZE;
}
// Make writes visible for main thread
// It is not necessary when using `std::thread::scope` but may be necessary in your case.
std::sync::atomic::fence(std::sync::atomic::Ordering::Release);
});
});
// Make writes visible for main thread
// It is not necessary when using `std::thread::scope` but may be necessary in your case.
std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
mmap.flush().expect("Failed to flush");
drop(mmap);
drop(file);
println!("Finished!");
}
请注意,有时您可能应该刷新写入的块,以避免消耗过多的 RAM。我没有编写该逻辑,因为它增加了我的示例的复杂性。
如果您确实从不重叠写入,则即使您的部分没有任何额外的同步,它们也应该正确地从 CPU 缓存写入 RAM 和从 RAM 写入磁盘,因为这是 CPU 和操作系统的责任。