我正在尝试在
HashMap
后面使用 RwLock
来实现共享状态、线程安全的缓存。理想情况下,我希望我的接口公开一个函数,该函数(给定键)将返回对缓存中值的引用(如果需要,首先填充它)。然而,我的中级 Rust 直觉告诉我,这种方法至少会遇到以下问题:
是否有可能返回对该值的引用? AFAIK,
HashMap
可以在需要时自由重新分配(*),因此引用的生命周期与 HashMap
不同,并且实际上是不确定的(就编译器而言)。 HashMap::entry
API 返回一个引用,但当然,它的生存时间不会超过其封闭范围。因此是否有必要返回一个具体的值,迫使我为值类型实现Copy
?
(*) 如果这些值未实现
Copy
,这仍然成立吗?
RwLock
对我来说是新的。我从 Mutex
开始,但 RwLock
看起来更合适,因为预期的缓存访问模式(即每个键写入一次,读取多次)。但是,我不确定是否可以将读锁“升级”为读写锁。 Rust 文档表明读锁会阻塞写锁,这会导致我建议/希望它工作的方式出现死锁。我可以放弃读锁并用写锁替换它,还是从一开始就使用完整的Mutex
更容易?
顺便说一句,获取值的函数是异步的,因此虽然我可以使用
HashMap::entry
API,但我无法使用 Entry
便利函数(如 or_insert
)...
无论如何,也许我有点难以理解,但这就是我到目前为止所得到的:
// NOTE The `Key` struct contains a reference, with lifetime `'key`
pub struct Cache<'key>(RwLock<HashMap<Key<'key>, Value>>);
impl<'cache, 'key> Cache<'key> {
pub fn new() -> Arc<Self> {
Arc::new(Self(RwLock::new(HashMap::new())))
}
// NOTE The key and value for the cache is derived from the `Payload` type
pub async fn fetch(
&'cache mut self,
data: &'cache Payload<'key>
) -> Result<&'cache Value> {
let cache = self.0.read()?;
let key = data.into();
Ok(match &mut cache.entry(key) {
// Match arm type: &Value
// FIXME This reference outlives the scope, which is a borrow violation
Entry::Occupied(entry) => entry.get(),
// Dragons be here...
Entry::Vacant(ref mut slot) => {
// FIXME How do I create a write lock here, without deadlocking on
// the read lock that's still in scope?
let value = data.get_value().await?;
// FIXME `VacantEntry<'_, Key<'_>, Value>` doesn't implement `Copy`
slot.insert(value)
}
})
}
}
关于你问题的第二部分(将读锁升级为读写锁),我什么也没有。 :-(
虽然,关于第一部分(在哈希映射发生变异时返回对哈希映射内容的引用),我有一个线索。
在这种情况下,引用的元素基本上是共享的,所有权不仅仅是缓存本身的问题。 例如,如果我们决定在过程中的某个时刻清除缓存,则不得删除仍然引用的元素。 因此,我们需要缓存中每个值的共享所有权。 我们不需要存储
Value
,而是需要存储 Arc<Value>
,以便在需要时使值比缓存更长久。
请在下面找到针对过于简化的情况(无泛型类型,硬编码示例)实现此目的的尝试:将字符串与其字符的代码向量相关联。 串行实现依赖于
RefCell
和 Rc
,而并行实现依赖于它们的同步等效项 Mutex
和 Arc
。
在每个演示中,我们可以看到,当使用相同的字符串时,获得的切片的地址保持不变(向量被缓存)。
mod serial {
use std::{cell::RefCell, collections::HashMap, rc::Rc};
pub struct Cache {
data: RefCell<HashMap<String, Rc<Vec<u32>>>>,
}
impl Cache {
pub fn new() -> Self {
Self {
data: RefCell::new(HashMap::new()),
}
}
pub fn fetch(
&self,
key: &str,
) -> Rc<Vec<u32>> {
let mut data = self.data.borrow_mut();
match data.get(key) {
Some(v) => Rc::clone(v),
None => {
let v = Rc::new(key.chars().map(|c| c as u32).collect());
data.insert(key.to_owned(), Rc::clone(&v));
v
}
}
}
}
pub fn demo() {
println!("~~~~ serial demo ~~~~");
let cache = Cache::new();
for it in 0..2 {
for key in ["abc", "def", "ghi"] {
let v = cache.fetch(key);
println!("iter {}: {:?} @ {:?}", it, v, v.as_ptr());
}
}
}
}
mod parallel {
use std::{collections::HashMap, sync::Arc, sync::Mutex};
pub struct Cache {
data: Mutex<HashMap<String, Arc<Vec<u32>>>>,
}
impl Cache {
pub fn new() -> Self {
Self {
data: Mutex::new(HashMap::new()),
}
}
pub fn fetch(
&self,
key: &str,
) -> Arc<Vec<u32>> {
let mut data = self.data.lock().unwrap();
match data.get(key) {
Some(v) => Arc::clone(v),
None => {
let v = Arc::new(key.chars().map(|c| c as u32).collect());
data.insert(key.to_owned(), Arc::clone(&v));
v
}
}
}
}
pub fn demo() {
println!("~~~~ parallel demo ~~~~");
let cache = Cache::new();
std::thread::scope(|s| {
s.spawn(|| {
for key in ["abc", "def", "ghi"] {
let v = cache.fetch(key);
println!("thread 1: {:?} @ {:?}", v, v.as_ptr());
}
});
s.spawn(|| {
for key in ["abc", "def", "ghi"] {
let v = cache.fetch(key);
println!("thread 2: {:?} @ {:?}", v, v.as_ptr());
}
});
});
}
}
fn main() {
serial::demo();
parallel::demo();
}
/*
~~~~ serial demo ~~~~
iter 0: [97, 98, 99] @ 0x559b204eaba0
iter 0: [100, 101, 102] @ 0x559b204eabe0
iter 0: [103, 104, 105] @ 0x559b204eac20
iter 1: [97, 98, 99] @ 0x559b204eaba0
iter 1: [100, 101, 102] @ 0x559b204eabe0
iter 1: [103, 104, 105] @ 0x559b204eac20
~~~~ parallel demo ~~~~
thread 1: [97, 98, 99] @ 0x7f4fb8000ca0
thread 1: [100, 101, 102] @ 0x7f4fb8000ce0
thread 1: [103, 104, 105] @ 0x7f4fb8000d20
thread 2: [97, 98, 99] @ 0x7f4fb8000ca0
thread 2: [100, 101, 102] @ 0x7f4fb8000ce0
thread 2: [103, 104, 105] @ 0x7f4fb8000d20
*/