下面的代码需要处理地图的大小并根据总大小进行旋转。 问题出在并发调用中。我无法避免写入比我预期更多的数据。我想拥有无锁支持数据存储的优势,但不知道什么样的模式可以帮助我。
struct Store {
map: DashMap<String, Vec<u8>>,
size: RwLock<usize>,
}
impl Store {
pub fn put(&self, key: String, buf: Vec<u8>) {
let clean_needed = {
let size = self.size.read().unwrap();
println!("Size {}", *size);
*size >= 50000usize
};
let buff_size = buf.len();
if clean_needed {
let mut size = self.size.write().unwrap();
*size = 0;
self.map.clear();
println!("Store cleaned");
} else {
let mut size = self.size.write().unwrap();
*size += buff_size;
self.map.entry(key).and_modify(|e| e.extend(buf)).or_insert(Vec::new());
println!("Buff added: {}", buff_size);
}
}
}
你有一个竞争条件,因为你多次锁定
size
。注释你的代码来解释:
let clean_needed = {
// Multiple threads can take a read lock at the same time
let size = self.size.read().unwrap();
println!("Size {}", *size);
// If the size was less than 50000,
// All would conclude `clean_needed = false`
*size >= 50000usize
};
let buff_size = buf.len();
if clean_needed {
let mut size = self.size.write().unwrap();
*size = 0;
self.map.clear();
println!("Store cleaned");
} else {
// Then they will each sequentially take a write lock
let mut size = self.size.write().unwrap();
// And increment the size
*size += buff_size;
// And push their data in
self.map.entry(key).and_modify(|e| e.extend(buf)).or_insert(Vec::new());
// Causing the size to overflow the limit
println!("Buff added: {}", buff_size);
}
解决方案是在整个例程中保持写锁:
struct Store {
map: DashMap<String, Vec<u8>>,
size: RwLock<usize>,
}
impl Store {
pub fn put(&self, key: String, buf: Vec<u8>) {
let mut size = self.size.write().unwrap();
let clean_needed = {
println!("Size {}", *size);
*size >= 50000usize
};
let buff_size = buf.len();
if clean_needed {
self.map.clear();
*size = 0;
println!("Store cleaned");
} else {
self.map
.entry(key)
.and_modify(|e| e.extend(buf))
.or_insert(buf); // Fixed
*size += buff_size;
println!("Buff added: {}", buff_size);
}
}
}