如何有效地按特定顺序获取互斥锁?

问题描述 投票:0回答:1

我需要使用外部确定的顺序在关键部分运行代码。

我目前提出了基于票证的锁,它利用互斥锁和 condvar 来使其他线程进入睡眠状态。不幸的是,它遇到了雷群问题

我当前的代码(简化的伪代码):

struct TicketMutex {
   ticket_dispenser: AtomicUsize, // start as 0
   ticket_serving: AtomicUsize, // start as 0
   unfair_mutex: Mutex,
   condvar: Condvar,
   data: *T,
}

// Note that it is called before locking a mutex.
// Mutex need to be locked in same order as resulted values.
// Called MUST call lock using returned value.
fn acquire_ticket(time_mutex: &TicketMutex) -> usize {
   time_mutex.ticket_dispenser.fetch_add(1, Relaxed)
}

// This acquires 
fn lock(time_mutex: &TicketMutex, ticket: usize){
   if ticket_mutex.ticket_serving.load(Acquire) == ticket {
      // Success!
      // 0 syscalls and writes here.
      return;
   }
   
   let unique_lock = time_mutex.unfair_mutex.lock();
   // Here thundering herd occurs because I wake all threads at once. ------ PROBLEM 1
   while ticket_mutex.ticket_serving.load(Acquire) != ticket {
       ticket_mutex.condvar.wait(unique_lock);
   }
   // Don't need to hold it during execution
   // because I track lock state by `ticket_serving` field.
   unique_lock.unlock();
}

fn unlock(time_mutex: &TicketMutex, ticket: usize){
   // Note that we can update `ticket_serving` non-atomically
   // because until that update we are in exclusive critical section.
   let next_ticket = ticket + 1;
   time_mutex.ticket_serving.store(next_ticket, Release);

   // The problem here that I always do 1-2 syscalls here    ------ PROBLEM 2
   // even if there is no contention at all (and no waiters).
   // All my attempts to put it behind some condition
   // lead to deadlocks.
   let unique_lock = time_mutex.unfair_mutex.lock();
   time_mutex.condvar.notify_all();
   unique_lock.unlock();
}
  1. 如何在
    lock
    函数中删除 condvars 的雷群?我正在考虑使用为每个线程分配单独的 condvar 作为局部变量,并将其放入带有头的侵入式链表中,但验证其正确性很棘手且很难。
  2. 至少在不满意的情况下,如何避免在
    unlock
    中进行系统调用?
  3. 也许还有其他方法来实现这个?我需要在锁定之前确定执行顺序,因为我想按照锁定另一个互斥体的顺序运行代码,我想在锁定此互斥体之前解锁该互斥体。
multithreading rust synchronization locking
1个回答
0
投票

我不知道这个问题怎么会被如此讨厌。无论如何,我自己想出了两个选择。

由于 C++ 不喜欢提问,所以我不会尝试编写伪代码,而只是 Rust。但是,为了简洁起见,我会省略互斥毒处理。

选项 1 更简单,它减少了惊群问题,但在锁定-解锁迭代中仍然始终使用至少一个系统调用。

选项 2 不那么简单,但如果没有争用,可以避免进行系统调用,并保证不会出现惊群情况。

选项 1:将条件变量数组放入互斥体中。

根据阵列的大小,这可以减少雷群问题的影响。 基本上,等待循环中的线程数除以数组大小,通过使用大数组可以轻松地将数组大小减少到 1。

// In struct
condvars: [Condvar; N],

// Waiting
if ticket != self.serving.load(Acquire) {
  let mut lock_guard = self.unfair_mutex.lock();
  while ticket != self.serving.load(Acquire) {
    lock_guard = self.condvars[ticket % N].wait(lock_guard);
  }
}

// unlocking
let next_ticket = ticket + 1;
self.serving.store(next_ticket, Release);
let _lock_guard = self.unfair_mutex.lock();
// Need to notify_all because if there are more than
// N threads, they may wait on same condvar.
self.condvars[next_ticket % N].notify_all();

选项 2:在堆栈上将每个线程的条件变量分开

它们被链接成侵入式链表,由互斥体中的原子指针指向。 它是安全的,因为这些值一直存在,直到成功获取锁为止,并且仅在持有互斥锁时才更新。

仅当 condvar 的创建很容易时此选项才可用。在 Windows 和 Linux 上,条件变量非常便宜,只需一次使用,因此可以按需创建它们。

该解决方案的主要优点:

  1. 如果没有等待线程,它不会通知任何条件变量,因此在无竞争的情况下不会进行系统调用。
  2. 它完全解决了雷群问题,因为它在解锁时最多通知 1 个线程。

我在满意的情况下测量了它的性能,与单个 condvar 上带有

notify_all
的代码相比,当线程数小于 CPU 核心数时,它的吞吐量增加了一倍,当 CPU 超额订阅(线程数多于 CPU 核心数)时,吞吐量提高了 5 倍).

struct TicketedMutex<T> {
    dispenser: AtomicUsize,
    serving: AtomicUsize,
    unfair_mutex: Mutex<()>,
    payload: UnsafeCell<T>,
    // It is a linked list of all sleeper threads
    // that need to be awoken when unlocking mutex.
    // They are ordered by ticket.
    // This pointer itself may be read outside of `unfair_mutex`
    // but elements must be read or updated only when it is locked.
    // Pointee must be updated inside unfair_mutex lcok.
    // It is important to note that each access of element in list is
    // guaranteed L1 cache miss because they are allocated
    // on stacks of different threads.
    waiters_head_ptr: AtomicPtr<Waiter>,
}

struct Waiter {
    condvar: Condvar,
    ticket: usize,
    next: Cell<*const Waiter>,
}

pub struct TicketedMutexGuard<'a, T> {
    ticket: usize,
    ticketed_mutex: &'a TicketedMutex<T>,
    payload: *mut T,
}

unsafe impl<T: Sync + Send> Sync for TicketedMutex<T> {}
unsafe impl<T: Sync + Send> Send for TicketedMutex<T> {}

impl<T> TicketedMutex<T> {
    pub fn new(val: T) -> Self {
        Self {
            dispenser: AtomicUsize::new(0),
            serving: AtomicUsize::new(0),
            unfair_mutex: Mutex::new(()),
            payload: UnsafeCell::new(val),
            waiters_head_ptr: AtomicPtr::new(core::ptr::null_mut()),
        }
    }

    // Caller must always invoke lock using acquired ticket at some point.
    // It must not call this method before acquiring lock.
    pub unsafe fn get_ticket(&self) -> usize {
        self.dispenser.fetch_add(1, Relaxed)
    }

    pub fn lock(&self, ticket: usize) -> TicketedMutexGuard<T> {
        if ticket != self.serving.load(Acquire) {
            let waiter = Waiter {
                condvar: Condvar::new(),
                ticket,
                next: Cell::new(core::ptr::null_mut()),
            };

            let mut guard = self.unfair_mutex.lock();

            // Sharing `waiter` with other threads is OK because
            // it would live until we acquire lock.
            // After we acquire lock, we remove it from list so other threads would not see it.
            let new_head = unsafe {
                let old_head: *const _ = self.waiters_head_ptr.load(Relaxed);
                add_to_linked_list_ordered(old_head, &waiter)
            }

            // This is a trick to implement `Acquire` store.
            // `swap` with memory ordering `Acquire` synchronizes us
            // with `Release` `fetch_add` in unlock, therefore preventing
            // reordering of `self.serving.load` in loop with this store.
            self.waiters_head_ptr.swap(new_head as *mut Waiter, Acquire);

            while ticket != self.serving.load(Acquire) {
                guard = waiter
                    .condvar
                    .wait(guard);
            }

            // Note that at this point we are owner of minimal ticket,
            // therefore our waiter must be first in linked list.
            // Value of `waiter` is synchonized by `unfair_mutex`.
            self.waiters_head_ptr
                .store(waiter.next.get() as *mut Waiter, Relaxed);
        }

        TicketedMutexGuard {
            ticket,
            ticketed_mutex: self,
            payload: self.payload.get(),
        }
    }

    fn unlock(&self, ticket: usize) {
        // Use wrapping add because that is what atomic fetch_add does.
        let next_ticket = ticket.wrapping_add(1);
        // Note that this variable is modified only when we keep lock
        // so no concurent modification is possible.
        // Therefore, there is no need in `fetch_add` or CAS here.
        self.serving.store(next_ticket, Release);

        // We change type here because AtomicPtr doesn't support fetch_add.
        let casted_to_usize: &AtomicUsize = unsafe {
            assert_eq!(align_of::<AtomicPtr<Waiter>>(), align_of::<AtomicUsize>());
            assert_eq!(size_of::<AtomicPtr<Waiter>>(), size_of::<AtomicUsize>());
            &*(&self.waiters_head_ptr as *const AtomicPtr<_> as *const _)
        };

        // Check if there is any waiters.
        //
        // We use fetch_add(0, Release) to make Release load.
        // Since this operation has Release store part, changing of serving
        // cannot be reordered with this operation.
        //
        // Now, let's prove that this cannot cause deadlocks.
        // 1. If `waiter_head.swap` in lock happens before `fetch_add`, we would see that pointer is not null,
        // and would try to notify waiter.
        // 2. If `fetch_add` happens before `swap`, this thread would leave without notifying the waiter
        // but since fetch_add has Release store part, Acquire load part of `swap` would synchronize with it,
        // so waiter thread is guaranteed to see the value of `serving` when checking condition of `while` loop
        // and would not start waiting on condvar.
        if casted_to_usize.fetch_add(0, Release) == 0 {
            // There is no waiting threads to wait
            // so let's return without trying to acquire mutex
            // and notify any threads.
            // 0 syscalls case.
            return;
        }

        // Note that we need to hold this mutex until notifying a thread
        // because otherwise we may notify it in exact moment between checking condition
        // and starting waiting on condvar, in which case our notification would be lost.
        let _guard = self.unfair_mutex.lock();

        // It is OK to use Relaxed ordering here because `unfair_mutex` synchronizes
        // memory accesses to the linked list.
        let waiter_head: *const Waiter = self.waiters_head_ptr.load(Relaxed);
        if waiter_head.is_null() {
            // This could happen if other thread have seen new serving after creating waiter.
            // It is progressed so we don't need to notify it.
            return;
        }

        unsafe {
            // Since linked list is ordered, it is enough to check the first element.
            if (*waiter_head).ticket != next_ticket {
                // This could happen in 2 situations:
                // 1. Next thread is successfully progressed.
                // 2. It has not yet checked serving in if statement or it is blocked on unfair_mutex.
                // In both cases, it would see new serving so would not wait on condvar.
                //
                // Waking threads that is not the one with next_ticket
                // is the responsibility of the next thread.
                return;
            }
        }

        unsafe {
            // Since this condvar is unique for a thread,
            // we don't need notify_all.
            (*waiter_head).condvar.notify_one();
        }
    }
}

// Returns new head.
// # Safety
// Must be called inside critical section.
// Elements in linked list must be valid.
unsafe fn add_to_linked_list_ordered(head: *const Waiter, value: &Waiter) -> *const Waiter {
    if head.is_null() {
        return value;
    }
    if unsafe { (*head).ticket > value.ticket } {
        value.next.set(head);
        return value;
    }

    unsafe {
        let mut it = head;
        while !(*it).next.get().is_null() && (*(*it).next.get()).ticket < value.ticket {
            it = (*it).next.get();
        }
        (*it).next.set(value);
    }
    head
}
© www.soinside.com 2019 - 2024. All rights reserved.