我应该在线程池调度程序中使用信号量还是条件变量?

问题描述 投票:0回答:2

我不确定我是否应该使用 std::semaphore 或 std::condition 变量来处理线程池系统中的线程等待。

我认为使用信号量并将其递增到超出其编译器特定的 ::max() 的风险很低。

我正在使用 c++20

以下部分代码有些伪代码,但说明了程序结构。

class ThreadPool
{
private: 
     struct ThreadData {
     
     threadsafe_queue < fn2::unique_function<void() > > ThreadTask; //library that makes moveable function
     **std::std::counting_semaphore<1> ThreadSignal {0}; //This is what i'm unsure of**
     
     }


    std::vector<std::jthread> mThreads;
    std::vector<ThreadData> mThreadData;

    template < typename Func >
    void CreateTask(Func&& func) {

      //iterate over threads and distribute the tasks
      ...
      mThreadData[thread_id].ThreadTask.emplace_back(std::move(func));
      mThreadData[thread_id].ThreadSignal.release(); //tell the queue there is work to do

     }
public:
    //Functions that parse a function into the above CreateTask()
    ...

    ThreadPool() {
     //for the required number of threads create them with an internal lambda that handles the work
    ...
         mThreads.emplace_back(std::move(std::jthread([id = thread_id]() {
              while (running) {
                    **mThreadData[id].ThreadSignal.aquire(); //gets the go-signal, which is still the focus of this post**
                    //handle work, either by getting from internal queue or stealing other work
                    //thread might "over"-consume the work in general by stealing work, but if a thread runs through this part without any work to get it will eventually run into a semaphore block
              }
         }
     }
}

例如,如果程序将 1000 个任务分配给 2 个线程。主线程会迅速将信号量增加到一个非常高的数字。我不确定这是否有未定义行为的风险?我应该改为实施条件变量吗?一般来说,信号量应该比条件变量快,这是要保留它的一个原因。这也非常简单,因为我可以通过遍历信号量并在所有信号量上调用 release 来轻松激活所有信号量。

以前,如果没有任务从列表中弹出,我有一个简单的 this_thread::yield() 调用,但它最终只消耗了大量的 cpu 时间,这就是我喜欢线程的原因“暂停”并使用各种网关。

如果我误解了一些概念,请多多包涵,因为我还是个新手。

c++ multithreading synchronization semaphore condition-variable
2个回答
1
投票

对于线程队列,工作方式是让工作线程等待一个条件变量,该变量告诉条件队列为空,因此如果队列中有新消息,等待的工作线程可以启动并只要队列不为空,done worker 就可以继续。这样,只有队列必须进行计数。

您还可以通过添加一个条件变量来保护内存,该变量告诉队列已满,只要队列已满就让喂食线程等待。


0
投票

(如阿空加瓜所建议):

所以通过从 ThreadData 结构中移除信号量并添加一个成员变量 std::counting_semaphore<> mPoolSemaphore{0};到线程池(它有一个非常大的 ::max),最后修改 CreateTask() 函数以增加上面的信号量并制作一个循环(在每个线程内)是这样的:

while (!mDone) {
        mPoolSemaphore.aquire();        
                while (mPendingTasks.load(std::memory_order_acquire) > 0) {
                    while (auto task = mThreadData[id].WorkQueue.Pop()) {
                        mPendingTasks.fetch_sub(1,std::memory_order_release);
                                    task();


                    }
                    ///hereafter: stealing from other queues implementation below
                    ...
                }

}

这就够了吗?我知道这样做每个线程可能会在一个 while 循环中旋转而无事可做,但也许,只是也许,它会从另一个线程偷走一份工作。

© www.soinside.com 2019 - 2024. All rights reserved.