如何处理无锁数据结构的原子更改?

问题描述 投票:0回答:1

下面的代码需要处理地图的大小并根据总大小进行旋转。 问题出在并发调用中。我无法避免写入比我预期更多的数据。我想拥有无锁支持数据存储的优势,但不知道什么样的模式可以帮助我。

struct Store {
    map: DashMap<String, Vec<u8>>,
    size: RwLock<usize>,
}
impl Store {
    pub fn put(&self, key: String, buf: Vec<u8>) {
        let clean_needed = {
            let size = self.size.read().unwrap();
            println!("Size {}", *size);
            *size  >= 50000usize
        };
        let buff_size = buf.len();
        if clean_needed {
            let mut size = self.size.write().unwrap();
            *size = 0;
            self.map.clear();
            println!("Store cleaned");
        } else {
            let mut size = self.size.write().unwrap();
            *size += buff_size;
            self.map.entry(key).and_modify(|e| e.extend(buf)).or_insert(Vec::new());
            println!("Buff added: {}", buff_size);
        }
    }
} 
rust concurrency
1个回答
0
投票

你有一个竞争条件,因为你多次锁定

size
。注释你的代码来解释:

        let clean_needed = {
            // Multiple threads can take a read lock at the same time
            let size = self.size.read().unwrap();
            println!("Size {}", *size);
            // If the size was less than 50000,
            // All would conclude `clean_needed = false`
            *size  >= 50000usize
        };
        let buff_size = buf.len();
        if clean_needed {
            let mut size = self.size.write().unwrap();
            *size = 0;
            self.map.clear();
            println!("Store cleaned");
        } else {
            // Then they will each sequentially take a write lock
            let mut size = self.size.write().unwrap();
            // And increment the size
            *size += buff_size;
            // And push their data in
            self.map.entry(key).and_modify(|e| e.extend(buf)).or_insert(Vec::new());
            // Causing the size to overflow the limit
            println!("Buff added: {}", buff_size);
        }

解决方案是在整个例程中保持写锁:

struct Store {
    map: DashMap<String, Vec<u8>>,
    size: RwLock<usize>,
}
impl Store {
    pub fn put(&self, key: String, buf: Vec<u8>) {
        let mut size = self.size.write().unwrap();

        let clean_needed = {
            println!("Size {}", *size);
            *size >= 50000usize
        };
        let buff_size = buf.len();
        if clean_needed {
            self.map.clear();
            *size = 0;
            println!("Store cleaned");
        } else {
            self.map
                .entry(key)
                .and_modify(|e| e.extend(buf))
                .or_insert(buf); // Fixed
            *size += buff_size;
            println!("Buff added: {}", buff_size);
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.