为什么这段代码偶尔会挂在 std::condition_variable::wait() 上?

问题描述 投票:0回答:2

我已经在 C++ 中实现了一个非常标准的单消费者-多生产者模式,此外还有队列中任务数量的限制。

Worker 在单独的线程上运行消息队列。任务从生产者发送给工人。如果队列中已经有

max_num_tasks_
,生产者必须等待。

使用工具集 v143 在 VS2022 for windows x64 中编译。

以下代码零星地挂在

Worker::Send()
语句
cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
处。有人看到我做错了什么吗?

工人的实施

#pragma once
#include <queue>
#include <mutex>
#include <functional>

class Worker {
public:
    ~Worker() {
        Send([this] {done = true; });
        thd.join();
    }

    Worker(size_t max_num_tasks) : max_num_tasks_(max_num_tasks), done(false), thd([this] {
        while (!done) {
            mq.PopFront()();
            cv_.notify_all();
        }
        })
    { }

    void Send(std::function<void()>&& m) {
        {
            std::unique_lock<std::mutex> lock(m_);
            cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
        }
        mq.PushBack(std::move(m));
    }
private:
    bool done;
    size_t max_num_tasks_;
    ThreadSafeQueue<std::function<void()>> mq;
    std::thread thd;
    std::mutex m_;
    std::condition_variable cv_;
};

ThreadSafeQueue 的实现

#pragma once
#include <queue>
#include <mutex>
#include <functional>

template <typename T>
class ThreadSafeQueue {
public:
    void PushBack(T&& val) {
        {
            std::unique_lock<std::mutex> lock(q_mutex);
            q.push(std::move(val));
        }
        cv.notify_one();
    }

    void PushBack(const T& val) {
        {
            std::unique_lock<std::mutex> lock(q_mutex);
            q.push(val);
        }
        cv.notify_one();
    }

    T PopFront() {
        std::unique_lock<std::mutex> lock(q_mutex);
        cv.wait(lock, [&] { return (!q.empty()); });
        T v = q.front();
        q.pop();
        return std::move(v);
    }

    bool Empty() const {
        std::unique_lock<std::mutex> lock(q_mutex);
        return q.empty();
    }

    size_t Size() const {
        std::unique_lock<std::mutex> lock(q_mutex);
        return q.size();
    }
private:
    mutable std::mutex q_mutex;
    std::condition_variable cv;
    std::queue<T> q;
};

偶尔触发问题的单元测试示例:

    TEST(WorkerTests, Compute_PI) {

        std::atomic<double> value;
        auto multiply_by_pi_over_four = [&value] {
            int n = 750;
            double v = 0;
            for (int i = 0; i < n; i++) {
                v += std::pow(-1, i) / (2 * i + 1);
            }
            value = value * v;
        };

        auto add_pi_over_four = [&value] {
            int n = 750;
            double v = 0;
            for (int i = 0; i < n; i++) {
                v += std::pow(-1, i) / (2 * i + 1);
            }
            value = value + v;
        };

        auto add_one = [&value] {
            value = value + 1;
            std::this_thread::sleep_for(std::chrono::milliseconds(3));
        };

        auto multiply_by_three = [&value] {
            value = value * 3;
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        };

        for (int i = 0; i < 100; i++) {
            value = 0.0;
            {
                Worker worker(1);
                worker.Send(add_one);
                worker.Send(multiply_by_pi_over_four);
                worker.Send(multiply_by_three);
                worker.Send([]() {});
                worker.Send(add_pi_over_four);
            }
            EXPECT_GE(3.15, value.load());
            EXPECT_LE(3.14, value.load());
        }
    }
c++ multithreading producer-consumer condition-variable stdthread
2个回答
3
投票

cv_
s 同步在显示的代码中被破坏。

cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });

wait()
执行的事件顺序如下:

  1. 互斥锁的初始状态是锁定的

  2. wait()
    条件被检查。

  3. 如果条件为假,则互斥量自动解锁并等待条件变量。

步骤 3 是一个原子的、不可分割的操作,但是步骤 2 与步骤 3 不是不可分割的。这是一个单独的步骤。

所以:

一个。第 2 步发生。

return (mq.Size() < max_num_tasks_);
评估为 false.

乙。 Worker 线程被唤醒并快速遍历

mq
中的所有内容,每次都向条件变量发出信号,耗尽
mq
直到它为空。

C。另一个线程从休眠中醒来,转到第 3 步,解锁互斥量并等待有人向条件变量发出信号。

D.没有任何东西会发出条件变量的信号。工作线程睡得很沉,而另一个执行线程反复按条件变量的喇叭,因为它很快排空了队列。

E。对

wait()
的调用现在等待条件变量发出信号,它永远不会。

基本上这个

wait()
需要使用same条件变量和互斥量,因为工作线程正在使用它来锁定它的线程。这种
wait()
队列低于其最大值的逻辑。大小,需要移入事件队列并使用相同的互斥锁和条件变量。


0
投票

我的第二次尝试如下:

class Worker {
public:
    ~Worker() {
        Send([this] {done = true; });
        thd.join();
    }

    Worker(size_t max_num_tasks) : max_num_tasks_(max_num_tasks), done(false), thd([this] {
        while (!done) {
            {
                std::unique_lock<std::mutex> lock(m_);
                cv_.wait(lock, [&] { return (!mq.empty()); });
                mq.front()();
                mq.pop();
            }
            cv_.notify_all();
        }
        })
    { }

    void Send(std::function<void()>&& m) {
        {
            {
                std::unique_lock<std::mutex> lock(m_);
                cv_.wait(lock, [&] {return (mq.size() < max_num_tasks_); });
                mq.push(std::move(m));
            }
            cv_.notify_all();
        }
    }
private:
    bool done;
    size_t max_num_tasks_;
    std::queue<std::function<void()>> mq;
    std::thread thd;
    std::mutex m_;
    std::condition_variable cv_;
};

然而,从性能的角度来看,使用两个 cv 以避免在 Send() 中不必要地唤醒所有消费者线程似乎是有益的。但也许这在宏伟的计划中可以忽略不计......

© www.soinside.com 2019 - 2024. All rights reserved.