尝试实现线程安全的缓存

问题描述 投票:0回答:1

我正在尝试在

HashMap
后面使用
RwLock
来实现共享状态、线程安全的缓存。理想情况下,我希望我的接口公开一个函数,该函数(给定键)将返回对缓存中值的引用(如果需要,首先填充它)。然而,我的中级 Rust 直觉告诉我,这种方法至少会遇到以下问题:

  1. 是否有可能返回对该值的引用? AFAIK,

    HashMap
    可以在需要时自由重新分配(*),因此引用的生命周期与
    HashMap
    不同,并且实际上是不确定的(就编译器而言)。
    HashMap::entry
    API 返回一个引用,但当然,它的生存时间不会超过其封闭范围。因此是否有必要返回一个具体的值,迫使我为值类型实现
    Copy

    (*) 如果这些值未实现

    Copy
    ,这仍然成立吗?

  2. RwLock
    对我来说是新的。我从
    Mutex
    开始,但
    RwLock
    看起来更合适,因为预期的缓存访问模式(即每个键写入一次,读取多次)。但是,我不确定是否可以将读锁“升级”为读写锁。 Rust 文档表明读锁会阻塞写锁,这会导致我建议/希望它工作的方式出现死锁。我可以放弃读锁并用写锁替换它,还是从一开始就使用完整的
    Mutex
    更容易?

顺便说一句,获取值的函数是异步的,因此虽然我可以使用

HashMap::entry
API,但我无法使用
Entry
便利函数(如
or_insert
)...

无论如何,也许我有点难以理解,但这就是我到目前为止所得到的:

// NOTE The `Key` struct contains a reference, with lifetime `'key`
pub struct Cache<'key>(RwLock<HashMap<Key<'key>, Value>>);

impl<'cache, 'key> Cache<'key> {
    pub fn new() -> Arc<Self> {
        Arc::new(Self(RwLock::new(HashMap::new())))
    }

    // NOTE The key and value for the cache is derived from the `Payload` type
    pub async fn fetch(
        &'cache mut self,
        data: &'cache Payload<'key>
    ) -> Result<&'cache Value> {
        let cache = self.0.read()?;
        let key = data.into();

        Ok(match &mut cache.entry(key) {
            // Match arm type: &Value
            // FIXME This reference outlives the scope, which is a borrow violation
            Entry::Occupied(entry) => entry.get(),

            // Dragons be here...
            Entry::Vacant(ref mut slot) => {
                // FIXME How do I create a write lock here, without deadlocking on
                // the read lock that's still in scope?

                let value = data.get_value().await?;

                // FIXME `VacantEntry<'_, Key<'_>, Value>` doesn't implement `Copy`
                slot.insert(value)
            }
        })
    }
}
rust hashmap thread-safety borrow-checker
1个回答
0
投票

关于你问题的第二部分(将读锁升级为读写锁),我什么也没有。 :-(

虽然,关于第一部分(在哈希映射发生变异时返回对哈希映射内容的引用),我有一个线索。

在这种情况下,引用的元素基本上是共享的,所有权不仅仅是缓存本身的问题。 例如,如果我们决定在过程中的某个时刻清除缓存,则不得删除仍然引用的元素。 因此,我们需要缓存中每个值的共享所有权。 我们不需要存储

Value
,而是需要存储
Arc<Value>
,以便在需要时使值比缓存更长久。

请在下面找到针对过于简化的情况(无泛型类型,硬编码示例)实现此目的的尝试:将字符串与其字符的代码向量相关联。 串行实现依赖于

RefCell
Rc
,而并行实现依赖于它们的同步等效项
Mutex
Arc
。 在每个演示中,我们可以看到,当使用相同的字符串时,获得的切片的地址保持不变(向量被缓存)。

mod serial {
    use std::{cell::RefCell, collections::HashMap, rc::Rc};

    pub struct Cache {
        data: RefCell<HashMap<String, Rc<Vec<u32>>>>,
    }

    impl Cache {
        pub fn new() -> Self {
            Self {
                data: RefCell::new(HashMap::new()),
            }
        }

        pub fn fetch(
            &self,
            key: &str,
        ) -> Rc<Vec<u32>> {
            let mut data = self.data.borrow_mut();
            match data.get(key) {
                Some(v) => Rc::clone(v),
                None => {
                    let v = Rc::new(key.chars().map(|c| c as u32).collect());
                    data.insert(key.to_owned(), Rc::clone(&v));
                    v
                }
            }
        }
    }

    pub fn demo() {
        println!("~~~~ serial demo ~~~~");
        let cache = Cache::new();
        for it in 0..2 {
            for key in ["abc", "def", "ghi"] {
                let v = cache.fetch(key);
                println!("iter {}: {:?} @ {:?}", it, v, v.as_ptr());
            }
        }
    }
}

mod parallel {
    use std::{collections::HashMap, sync::Arc, sync::Mutex};

    pub struct Cache {
        data: Mutex<HashMap<String, Arc<Vec<u32>>>>,
    }

    impl Cache {
        pub fn new() -> Self {
            Self {
                data: Mutex::new(HashMap::new()),
            }
        }

        pub fn fetch(
            &self,
            key: &str,
        ) -> Arc<Vec<u32>> {
            let mut data = self.data.lock().unwrap();
            match data.get(key) {
                Some(v) => Arc::clone(v),
                None => {
                    let v = Arc::new(key.chars().map(|c| c as u32).collect());
                    data.insert(key.to_owned(), Arc::clone(&v));
                    v
                }
            }
        }
    }

    pub fn demo() {
        println!("~~~~ parallel demo ~~~~");
        let cache = Cache::new();
        std::thread::scope(|s| {
            s.spawn(|| {
                for key in ["abc", "def", "ghi"] {
                    let v = cache.fetch(key);
                    println!("thread 1: {:?} @ {:?}", v, v.as_ptr());
                }
            });
            s.spawn(|| {
                for key in ["abc", "def", "ghi"] {
                    let v = cache.fetch(key);
                    println!("thread 2: {:?} @ {:?}", v, v.as_ptr());
                }
            });
        });
    }
}

fn main() {
    serial::demo();
    parallel::demo();
}
/*
~~~~ serial demo ~~~~
iter 0: [97, 98, 99] @ 0x559b204eaba0
iter 0: [100, 101, 102] @ 0x559b204eabe0
iter 0: [103, 104, 105] @ 0x559b204eac20
iter 1: [97, 98, 99] @ 0x559b204eaba0
iter 1: [100, 101, 102] @ 0x559b204eabe0
iter 1: [103, 104, 105] @ 0x559b204eac20
~~~~ parallel demo ~~~~
thread 1: [97, 98, 99] @ 0x7f4fb8000ca0
thread 1: [100, 101, 102] @ 0x7f4fb8000ce0
thread 1: [103, 104, 105] @ 0x7f4fb8000d20
thread 2: [97, 98, 99] @ 0x7f4fb8000ca0
thread 2: [100, 101, 102] @ 0x7f4fb8000ce0
thread 2: [103, 104, 105] @ 0x7f4fb8000d20
*/
© www.soinside.com 2019 - 2024. All rights reserved.