在 Rust 中从多个线程写入内存映射文件

问题描述 投票:0回答:1

在 Rust 中,在没有多线程同步的情况下写入内存映射文件的正确方法是什么?

我需要使用多个线程创建一个 40+ GB 的文件。该文件用作 u64 值的巨大向量。线程不需要任何类型的同步——每个线程的输出对于该线程来说都是唯一的,但每个线程不会获得自己的切片。相反,数据的性质确保每个线程将在文件中生成一组要写入的唯一位置。简单的例子——每个线程写入一个位置

[ind / thread_count]
,其中
ind
达到数百万。对于
thread_count = 2
,一个线程写入奇数位置,另一个线程写入偶数位置。

我使用了 memmap2 - memmap 库的一个新维护的分支。

memmap2
似乎可以完成我访问所需的一切,但我不知道如何从多个线程正确使用它。

multithreading rust mmap memory-mapped-files
1个回答
0
投票

如果你可以保证你的线程永远不会写入相同的偏移量,你可以将文件映射到内存并使用

std::ptr::write
写入它。

示例:

use std::fmt::Write;
use std::fs::File;

use memmap2::{MmapMut, MmapOptions};

fn main() {
    const CHUNK_SIZE: usize = 32;
    const TOTAL_SIZE: usize = 40 * usize::pow(2, 30);

    let file_name = {
        let mut p = std::env::temp_dir();
        p.push("many_gib_file.tmp");
        p
    };
    println!("Creating file {}", file_name.as_os_str().to_string_lossy());
    let file: File = File::options()
        .read(true)
        .write(true)
        .create(true)
        .open(&file_name)
        .expect("Failed to create file");
    file.set_len(TOTAL_SIZE.try_into().unwrap())
        .expect("Failed to resize file");

    let mut mmap: MmapMut = unsafe {
        MmapOptions::new()
            .offset(0)
            .len(TOTAL_SIZE)
            .map_mut(&file)
            .expect("Failed to map file")
    };

    let start_ptr = mmap.as_mut_ptr();

    #[derive(Copy, Clone)]
    struct ShareablePtr(*mut u8);

    // SAFETY: We never alias data when writing from multiple threads.
    // Writer threads finish before unmapping.
    unsafe impl Send for ShareablePtr {}

    let start_ptr = ShareablePtr(start_ptr);
    std::thread::scope(|s| {
        // Even thread
        s.spawn(move || {
            let start_ptr = start_ptr;
            let start_ptr: *mut u8 = start_ptr.0;

            let mut value = String::with_capacity(CHUNK_SIZE);
            let mut offset = 0;
            while offset + CHUNK_SIZE <= TOTAL_SIZE {
                value.clear();

                write!(&mut value, "Even:{:26}\n", offset).unwrap();
                assert_eq!(value.len(), CHUNK_SIZE);

                unsafe {
                    // SAFETY: We never overlap on writes.
                    // Pointer is living because we using scoped threads.
                    std::ptr::copy_nonoverlapping(
                        value.as_ptr(),
                        start_ptr.add(offset),
                        CHUNK_SIZE,
                    );
                }

                // Add offset for ourselves and for odd thread.
                offset += 2 * CHUNK_SIZE;
            }

            // Make writes visible for main thread
            // It is not necessary when using `std::thread::scope` but may be necessary in your case.
            std::sync::atomic::fence(std::sync::atomic::Ordering::Release);
        });
        // Odd thread
        s.spawn(move || {
            let start_ptr = start_ptr;
            let start_ptr: *mut u8 = start_ptr.0;

            let mut value = String::with_capacity(CHUNK_SIZE);
            // We want it to not overlap with even thread.
            let mut offset = CHUNK_SIZE;
            while offset + CHUNK_SIZE <= TOTAL_SIZE {
                value.clear();

                write!(&mut value, "Odd:{:27}\n", offset).unwrap();
                assert_eq!(value.len(), CHUNK_SIZE);

                unsafe {
                    // SAFETY: We never overlap on writes.
                    // Pointer is living because we using scoped threads.
                    std::ptr::copy_nonoverlapping(
                        value.as_ptr(),
                        start_ptr.add(offset),
                        CHUNK_SIZE,
                    );
                }

                // Add offset for ourselves and for even thread.
                offset += 2 * CHUNK_SIZE;
            }

            // Make writes visible for main thread
            // It is not necessary when using `std::thread::scope` but may be necessary in your case.
            std::sync::atomic::fence(std::sync::atomic::Ordering::Release);
        });
    });

    // Make writes visible for main thread
    // It is not necessary when using `std::thread::scope` but may be necessary in your case.
    std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);

    mmap.flush().expect("Failed to flush");
    drop(mmap);
    drop(file);

    println!("Finished!");
}

请注意,有时您可能应该刷新写入的块,以避免消耗过多的 RAM。我没有编写该逻辑,因为它增加了我的示例的复杂性。

如果您确实从不重叠写入,则即使您的部分没有任何额外的同步,它们也应该正确地从 CPU 缓存写入 RAM 和从 RAM 写入磁盘,因为这是 CPU 和操作系统的责任。

© www.soinside.com 2019 - 2024. All rights reserved.