我的工作线程实现 c++ 中有这个死锁问题

问题描述 投票:0回答:2

我正在尝试实现一个工作对象,它是一个等待任务并仅在销毁时终止的线程:

#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <iostream>

    
    
class Worker {
public:
    Worker(Worker&&) = delete;
    Worker(const Worker&) = delete;
    Worker() 
        : stop(false)
        , thread(&Worker::worker_thread, this)
    {}
    void push(std::function<void()> _f) {
        std::unique_lock lock(thread_mutex);
        task = std::move(_f);
        new_task.notify_one();
    }
    #if 0
    ~Worker() {
        std::unique_lock lock(thread_mutex);
        stop = true; 
        new_task.notify_one();
        lock.unlock();
        if (thread.joinable()) 
            thread.join();  
    } 
    #endif
private:
    void worker_thread() {
        std::unique_lock lock(thread_mutex);
        while (true) {
            new_task.wait(lock);
            if (stop) return;
            task();
        }
    }
    std::atomic<bool> stop;
    std::function<void()> task;
    std::thread thread;
    std::mutex thread_mutex;
    std::condition_variable new_task;
};

我在这里想出了这个目前不适用于 gcc 的例子:

int main() {
    Worker t;
    t.push([] { std::cout << "Hello from worker" << std::endl; });

    for (int i = 0; i < 10; ++i)
        t.push([i] { std::cout << i << std::endl; });

    return 0;
}

如果在实现析构函数的情况下进行编译,我会遇到死锁,并且控制台不会打印任何内容。 如果我没有编译(那么我也必须找到析构函数的问题,因为显然我需要它)然后我得到这个输出:

terminate called without an active exception
9
9
9
9
9

.
.
. and so on 

9

打印没有同步问题,因为是单线程完成的

所以这是我的代码的工作方式(应该):

当构造一个

Worker
对象时,它会产生一个执行
worker_thread
函数的线程。 此函数锁定
thread_mutex
,并且应该仅在等待条件变量时将其解锁。

当一个任务被推送时,

push
函数会尝试锁定互斥量,它应该只在它可以的时候,也就是当
worker_thread
正在等待带有条件变量的任务时。

所以如果线程正在等待,

push
应该能够获取锁并在
task
缓冲区中移动新任务,然后通知条件变量,唤醒线程。

我并不是在假装说我的代码是正确的,因为我自己感觉到这里有问题,但我就是不能指手画脚。

一个提示是这段代码(仍然没有析构函数):

int main() {
    Worker t;
    t.push([] { std::cout << "Hello from worker" << std::endl; });

    //for (int i = 0; i < 10; ++i)
    //    t.push([i] { std::cout << i << std::endl; });

    return 0;
}

永远说你好,好吧,在某些时候它会崩溃,基本上它和 for 循环做同样的事情,但应该只做一次然后等待下一个任务。

这更奇怪,因为我最初的想法是一个接一个地执行多个推送会出现问题,也许这可能会导致锁出现一些问题,但在最后一个示例中,我只调用了一次

push
并且仍然我有问题。

有人能明白问题出在哪里吗?

编辑:

class Worker {
public:
    Worker(Worker&&) = delete;
    Worker(const Worker&) = delete;
    Worker() 
        : stop(false)
        , task(nullptr)
        , thread(&Worker::worker_thread, this)
    {}
    void push(std::function<void()> _f) {
        std::unique_lock lock(thread_mutex);
        task = std::move(_f);
        new_task.notify_one();
    }
    #if 1
    ~Worker() {
        std::unique_lock lock(thread_mutex);
        stop = true; 
        new_task.notify_one();
        lock.unlock();
        if (thread.joinable()) 
            thread.join();  
    } 
    #endif
private:
    void worker_thread() {
        std::unique_lock lock(thread_mutex);
        while (true) {
            new_task.wait(lock, [this] { return task || stop; }); //
            if (stop) return;
            task();
            task = nullptr; // reset task for check
        }
    }
    bool stop; // does not need to be atomic
    std::function<void()> task;
    std::mutex thread_mutex;
    std::condition_variable new_task;
    std::thread thread; // moved to bottom
};

这是根据您的建议修改后的代码版本。 它似乎确实处理了 hello 部分,但是当涉及到 for 循环的第一次测试时,它只打印 9,没有 hello,如果我在 main 函数返回之前等待一秒钟,否则它不会打印任何东西:

int main() {
    Worker t;
    t.push([] { std::cout << "Hello from worker" << std::endl; });

    for (int i = 0; i < 10; ++i)
        t.push([i] { std::cout << i << std::endl; });


    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 0;
}

如果有人能弄清楚为什么它很受欢迎,否则没关系,你已经帮了我很多。

c++ multithreading mutex deadlock condition-variable
2个回答
3
投票
  1. 阅读有关初始化顺序的信息。当
    thread_mutex
    new_task
    尚未初始化时,您运行一个线程。使用未初始化的成员运行
    worker_thread
    是未定义的行为。
  2. Worker
    构造,推送任务,销毁可以在线程工作者启动之前发生,并且工作者永远等待条件变量。您应该首先使用相反方向的条件变量来向构造函数发出有关正在运行的工作人员的信号。

0
投票

这是适用于两个示例的解决方案:

class Worker {
public:
    Worker(Worker&&) = delete;
    Worker(const Worker&) = delete;
    Worker() 
        : stop(false)
        , task(nullptr)
        , thread(&Worker::worker_thread, this)
    {}
    void push(std::function<void()> _f) {
        std::unique_lock lock(thread_mutex);
        cv.wait(lock, [this] { return !task; });
        task = std::move(_f);
        new_task.notify_one();
    }
    ~Worker() {
        std::unique_lock lock(thread_mutex);
        cv.wait(lock, [this] { return !task; });
        stop = true; 
        new_task.notify_one();
        lock.unlock();
        if (thread.joinable()) 
            thread.join();  
    } 
private:
    void worker_thread() {
        std::unique_lock lock(thread_mutex);
        while (true) {
            cv.wait(lock, [this] { return task || stop; }); //
            if (stop) return;
            task();
            task = nullptr; // reset task for check
            new_task.notify_one();
        }
    }
    bool stop; // does not need to be atomic
    std::function<void()> task;
    std::mutex thread_mutex;
    std::condition_variable cv;
    std::thread thread; // moved to bottom
};

我遇到的主要问题是我不明白条件变量是如何工作的。

条件变量不等待信号,它等待条件。 所以偶尔条件变量会“唤醒”线程,检查条件是否满足是用户的责任。 使用条件变量时,检查条件很重要,否则它会时不时地唤醒并运行之后的操作。 所以这样做的一种方法是:

while (!condition)
    cv.wait(lock);

或者这个,使用 lambdas:

cv.wait(lock, [] { return condition; });

所以

cv.notify_one()
只是一个条件可能已经改变的提示,而不是唤醒线程的命令。

此外,我必须小心初始化,因为在我之前的代码中,线程是在条件变量和互斥锁之前初始化的,在这种情况下是否有所不同尚不清楚,它可能确实如此。 成员变量按照声明的顺序进行初始化。

最后,我还需要检查

push
和析构函数中的另一个条件。我需要在两者中都看到任务无效,或者设置为
0
NULL
.

这是因为如果设置为NULL,就意味着

push
函数可以安全的修改
worker_thread
未使用的任务。 析构函数做类似的事情,它需要在销毁之前查看线程是否执行完最后一个任务,将
stop
标志设置为
true
, 那是因为工作人员在执行任务之前检查是否设置了标志。

就这些,谢谢大家的热心帮助,希望这个问题对所有需要了解条件变量的程序员有所帮助

© www.soinside.com 2019 - 2024. All rights reserved.