使用条件变量从多个生产者线程向消费者线程发出信号的最有效方法

问题描述 投票:0回答:1

我有 N 个工作线程,它们执行另一个消费者线程正在等待的计算。 我使用一个条件变量(CV)和一个从 N 开始的原子计数器来完成此操作,每个工作人员都会减少它,并且达到 0 的信号会向消费者发出使用 CV 的信号。 根据多个来源(example),“共享变量”(在我的例子中是计数器)需要在持有互斥锁的同时进行修改。此后,您可以在释放互斥锁后向 CV 发出信号。

我可以这样做,但这意味着每个工作线程都会尝试获取互斥锁,这可能会导致不必要的争用。我想在没有互斥体的情况下进行递减(因为它的原子性不应该是竞争条件),并且只获取最后一个工作线程的互斥体,即实际发送信号的线程。

工作示例:

constexpr int N_WORKERS = 10;

struct WorkData
{
    int input[N_WORKERS];
    int result[N_WORKERS];
    std::atomic<int> remainingWorkers;
    std::condition_variable cv;
    std::mutex mutex;  
};

void workerFunc(WorkData* wd, int index)
{
    // Do some calculation here
    wd->result[index] = wd->input[index] * wd->input[index];
    //-------------------------
    if (--wd->remainingWorkers == 0)
    {
        wd->mutex.lock();
        wd->mutex.unlock();
        
        wd->cv.notify_one();
    }
}

int main()
{
    WorkData wd;
    wd.remainingWorkers.store(N_WORKERS);

    std::thread workerThreads[N_WORKERS];
    for (int i = 0; i < N_WORKERS; i++)
    {
        wd.input[i] = i;
        wd.result[i] = 0;
        workerThreads[i] = std::thread(workerFunc, &wd, i);
    }
    // Wait for the worker threads to finish unless they already have
    if (wd.remainingWorkers.load() > 0)
    {
        std::unique_lock<std::mutex> lock(wd.mutex);
        while (wd.remainingWorkers.load() > 0)
            wd.cv.wait(lock);
    }
    // Consume result of calculations
    for (int i = 0; i < N_WORKERS; i++)
        std::cout << wd.input[i] << "^2 = " << wd.result[i] << std::endl;
    
    for (std::thread& t : workerThreads)
        t.join();
    return 0;
}

我知道我仍然需要从信号线程获取互斥锁,以确保信号不会在等待线程检查原子和进入条件变量等待的点之间发送,但我看不出为什么工作线程在获取互斥体之前无法修改计数器吗?

这安全吗?或者这里是否存在我错过的竞争条件?

c++ multithreading atomic race-condition condition-variable
1个回答
1
投票

存在竞争条件,因为可以在

wd.remainingWorkers.load() > 0)
wd.cv.wait(lock);
之间引发通知。这将导致
wait
永远无法完成,因为会错过通知。请参阅 https://en.cppreference.com/w/cpp/thread/condition_variable 其中指出:

即使共享变量是原子的,也必须在拥有互斥体的同时对其进行修改,才能正确地将修改发布到等待线程。

您可以通过分离剩余的工作线程数和工作已完成的事实来消除互斥锁争用。这样,只有完成最后计算的工作人员才需要锁定互斥体并发出通知:

#include <atomic>
#include <mutex>
#include <condition_variable>
#include <iostream>

constexpr int N_WORKERS = 10;

struct WorkData
{
    int input[N_WORKERS];
    int result[N_WORKERS];
    std::atomic<int> remainingWorkers;
    bool complete;
    std::condition_variable cv;
    std::mutex mutex;  
};

void workerFunc(WorkData* wd, int index)
{
    // Do some calculation here
    wd->result[index] = wd->input[index] * wd->input[index];
    //-------------------------
    if (--wd->remainingWorkers == 0)
    {
        wd->mutex.lock();
        wd->complete = true;
        wd->mutex.unlock();
        
        wd->cv.notify_one();
    }
}

int main()
{
    WorkData wd;
    wd.remainingWorkers.store(N_WORKERS);
    wd.complete = false;

    std::thread workerThreads[N_WORKERS];
    for (int i = 0; i < N_WORKERS; i++)
    {
        wd.input[i] = i;
        wd.result[i] = 0;
        workerThreads[i] = std::thread(workerFunc, &wd, i);
    }
    // Wait for the worker threads to finish unless they already have
    std::unique_lock<std::mutex> lock(wd.mutex);
    wd.cv.wait(lock, [&]{ return wd.complete; });
    // Consume result of calculations
    for (int i = 0; i < N_WORKERS; i++)
        std::cout << wd.input[i] << "^2 = " << wd.result[i] << std::endl;
    
    for (std::thread& t : workerThreads)
        t.join();
    return 0;
}
© www.soinside.com 2019 - 2024. All rights reserved.