我需要使用外部确定的顺序在关键部分运行代码。
我目前提出了基于票证的锁,它利用互斥锁和 condvar 来使其他线程进入睡眠状态。不幸的是,它遇到了雷群问题
我当前的代码(简化的伪代码):
struct TicketMutex {
ticket_dispenser: AtomicUsize, // start as 0
ticket_serving: AtomicUsize, // start as 0
unfair_mutex: Mutex,
condvar: Condvar,
data: *T,
}
// Note that it is called before locking a mutex.
// Mutex need to be locked in same order as resulted values.
// Called MUST call lock using returned value.
fn acquire_ticket(time_mutex: &TicketMutex) -> usize {
time_mutex.ticket_dispenser.fetch_add(1, Relaxed)
}
// This acquires
fn lock(time_mutex: &TicketMutex, ticket: usize){
if ticket_mutex.ticket_serving.load(Acquire) == ticket {
// Success!
// 0 syscalls and writes here.
return;
}
let unique_lock = time_mutex.unfair_mutex.lock();
// Here thundering herd occurs because I wake all threads at once. ------ PROBLEM 1
while ticket_mutex.ticket_serving.load(Acquire) != ticket {
ticket_mutex.condvar.wait(unique_lock);
}
// Don't need to hold it during execution
// because I track lock state by `ticket_serving` field.
unique_lock.unlock();
}
fn unlock(time_mutex: &TicketMutex, ticket: usize){
// Note that we can update `ticket_serving` non-atomically
// because until that update we are in exclusive critical section.
let next_ticket = ticket + 1;
time_mutex.ticket_serving.store(next_ticket, Release);
// The problem here that I always do 1-2 syscalls here ------ PROBLEM 2
// even if there is no contention at all (and no waiters).
// All my attempts to put it behind some condition
// lead to deadlocks.
let unique_lock = time_mutex.unfair_mutex.lock();
time_mutex.condvar.notify_all();
unique_lock.unlock();
}
lock
函数中删除 condvars 的雷群?我正在考虑使用为每个线程分配单独的 condvar 作为局部变量,并将其放入带有头的侵入式链表中,但验证其正确性很棘手且很难。unlock
中进行系统调用?我不知道这个问题怎么会被如此讨厌。无论如何,我自己想出了两个选择。
由于 C++ 不喜欢提问,所以我不会尝试编写伪代码,而只是 Rust。但是,为了简洁起见,我会省略互斥毒处理。
选项 1 更简单,它减少了惊群问题,但在锁定-解锁迭代中仍然始终使用至少一个系统调用。
选项 2 不那么简单,但如果没有争用,可以避免进行系统调用,并保证不会出现惊群情况。
根据阵列的大小,这可以减少雷群问题的影响。 基本上,等待循环中的线程数除以数组大小,通过使用大数组可以轻松地将数组大小减少到 1。
// In struct
condvars: [Condvar; N],
// Waiting
if ticket != self.serving.load(Acquire) {
let mut lock_guard = self.unfair_mutex.lock();
while ticket != self.serving.load(Acquire) {
lock_guard = self.condvars[ticket % N].wait(lock_guard);
}
}
// unlocking
let next_ticket = ticket + 1;
self.serving.store(next_ticket, Release);
let _lock_guard = self.unfair_mutex.lock();
// Need to notify_all because if there are more than
// N threads, they may wait on same condvar.
self.condvars[next_ticket % N].notify_all();
它们被链接成侵入式链表,由互斥体中的原子指针指向。 它是安全的,因为这些值一直存在,直到成功获取锁为止,并且仅在持有互斥锁时才更新。
仅当 condvar 的创建很容易时此选项才可用。在 Windows 和 Linux 上,条件变量非常便宜,只需一次使用,因此可以按需创建它们。
该解决方案的主要优点:
我在满意的情况下测量了它的性能,与单个 condvar 上带有
notify_all
的代码相比,当线程数小于 CPU 核心数时,它的吞吐量增加了一倍,当 CPU 超额订阅(线程数多于 CPU 核心数)时,吞吐量提高了 5 倍).
struct TicketedMutex<T> {
dispenser: AtomicUsize,
serving: AtomicUsize,
unfair_mutex: Mutex<()>,
payload: UnsafeCell<T>,
// It is a linked list of all sleeper threads
// that need to be awoken when unlocking mutex.
// They are ordered by ticket.
// This pointer itself may be read outside of `unfair_mutex`
// but elements must be read or updated only when it is locked.
// Pointee must be updated inside unfair_mutex lcok.
// It is important to note that each access of element in list is
// guaranteed L1 cache miss because they are allocated
// on stacks of different threads.
waiters_head_ptr: AtomicPtr<Waiter>,
}
struct Waiter {
condvar: Condvar,
ticket: usize,
next: Cell<*const Waiter>,
}
pub struct TicketedMutexGuard<'a, T> {
ticket: usize,
ticketed_mutex: &'a TicketedMutex<T>,
payload: *mut T,
}
unsafe impl<T: Sync + Send> Sync for TicketedMutex<T> {}
unsafe impl<T: Sync + Send> Send for TicketedMutex<T> {}
impl<T> TicketedMutex<T> {
pub fn new(val: T) -> Self {
Self {
dispenser: AtomicUsize::new(0),
serving: AtomicUsize::new(0),
unfair_mutex: Mutex::new(()),
payload: UnsafeCell::new(val),
waiters_head_ptr: AtomicPtr::new(core::ptr::null_mut()),
}
}
// Caller must always invoke lock using acquired ticket at some point.
// It must not call this method before acquiring lock.
pub unsafe fn get_ticket(&self) -> usize {
self.dispenser.fetch_add(1, Relaxed)
}
pub fn lock(&self, ticket: usize) -> TicketedMutexGuard<T> {
if ticket != self.serving.load(Acquire) {
let waiter = Waiter {
condvar: Condvar::new(),
ticket,
next: Cell::new(core::ptr::null_mut()),
};
let mut guard = self.unfair_mutex.lock();
// Sharing `waiter` with other threads is OK because
// it would live until we acquire lock.
// After we acquire lock, we remove it from list so other threads would not see it.
let new_head = unsafe {
let old_head: *const _ = self.waiters_head_ptr.load(Relaxed);
add_to_linked_list_ordered(old_head, &waiter)
}
// This is a trick to implement `Acquire` store.
// `swap` with memory ordering `Acquire` synchronizes us
// with `Release` `fetch_add` in unlock, therefore preventing
// reordering of `self.serving.load` in loop with this store.
self.waiters_head_ptr.swap(new_head as *mut Waiter, Acquire);
while ticket != self.serving.load(Acquire) {
guard = waiter
.condvar
.wait(guard);
}
// Note that at this point we are owner of minimal ticket,
// therefore our waiter must be first in linked list.
// Value of `waiter` is synchonized by `unfair_mutex`.
self.waiters_head_ptr
.store(waiter.next.get() as *mut Waiter, Relaxed);
}
TicketedMutexGuard {
ticket,
ticketed_mutex: self,
payload: self.payload.get(),
}
}
fn unlock(&self, ticket: usize) {
// Use wrapping add because that is what atomic fetch_add does.
let next_ticket = ticket.wrapping_add(1);
// Note that this variable is modified only when we keep lock
// so no concurent modification is possible.
// Therefore, there is no need in `fetch_add` or CAS here.
self.serving.store(next_ticket, Release);
// We change type here because AtomicPtr doesn't support fetch_add.
let casted_to_usize: &AtomicUsize = unsafe {
assert_eq!(align_of::<AtomicPtr<Waiter>>(), align_of::<AtomicUsize>());
assert_eq!(size_of::<AtomicPtr<Waiter>>(), size_of::<AtomicUsize>());
&*(&self.waiters_head_ptr as *const AtomicPtr<_> as *const _)
};
// Check if there is any waiters.
//
// We use fetch_add(0, Release) to make Release load.
// Since this operation has Release store part, changing of serving
// cannot be reordered with this operation.
//
// Now, let's prove that this cannot cause deadlocks.
// 1. If `waiter_head.swap` in lock happens before `fetch_add`, we would see that pointer is not null,
// and would try to notify waiter.
// 2. If `fetch_add` happens before `swap`, this thread would leave without notifying the waiter
// but since fetch_add has Release store part, Acquire load part of `swap` would synchronize with it,
// so waiter thread is guaranteed to see the value of `serving` when checking condition of `while` loop
// and would not start waiting on condvar.
if casted_to_usize.fetch_add(0, Release) == 0 {
// There is no waiting threads to wait
// so let's return without trying to acquire mutex
// and notify any threads.
// 0 syscalls case.
return;
}
// Note that we need to hold this mutex until notifying a thread
// because otherwise we may notify it in exact moment between checking condition
// and starting waiting on condvar, in which case our notification would be lost.
let _guard = self.unfair_mutex.lock();
// It is OK to use Relaxed ordering here because `unfair_mutex` synchronizes
// memory accesses to the linked list.
let waiter_head: *const Waiter = self.waiters_head_ptr.load(Relaxed);
if waiter_head.is_null() {
// This could happen if other thread have seen new serving after creating waiter.
// It is progressed so we don't need to notify it.
return;
}
unsafe {
// Since linked list is ordered, it is enough to check the first element.
if (*waiter_head).ticket != next_ticket {
// This could happen in 2 situations:
// 1. Next thread is successfully progressed.
// 2. It has not yet checked serving in if statement or it is blocked on unfair_mutex.
// In both cases, it would see new serving so would not wait on condvar.
//
// Waking threads that is not the one with next_ticket
// is the responsibility of the next thread.
return;
}
}
unsafe {
// Since this condvar is unique for a thread,
// we don't need notify_all.
(*waiter_head).condvar.notify_one();
}
}
}
// Returns new head.
// # Safety
// Must be called inside critical section.
// Elements in linked list must be valid.
unsafe fn add_to_linked_list_ordered(head: *const Waiter, value: &Waiter) -> *const Waiter {
if head.is_null() {
return value;
}
if unsafe { (*head).ticket > value.ticket } {
value.next.set(head);
return value;
}
unsafe {
let mut it = head;
while !(*it).next.get().is_null() && (*(*it).next.get()).ticket < value.ticket {
it = (*it).next.get();
}
(*it).next.set(value);
}
head
}