如何根据请求取消使用 std::async 生成的特定工作线程

问题描述 投票:0回答:1

我的应用程序需要根据请求生成多个工作线程。它还应该提供一个选项来取消基于请求(带有 ID)的长处理线程。

使用带有中断点的std::atomic_bool,所有工作线程都被取消,但无法取消请求的线程。如何实现取消特定工作线程而不是全部。

void doSomething(const int x, const int y, const int count, std::atomic_bool& isCancel)
{
    int myValue = 0;
    for (int itr=0; (itr < count) && !isCancel; itr++)
    {
        this_thread::sleep_for(chrono::milliseconds(100));
        //std::this_thread::sleep_for(std::chrono::seconds(4));
        myValue = x + y + itr;
        std::cout << "MyValue: " << myValue << std::endl;
    }

    if (isCancel)
    {
        std::cout << "cancel updated: exit with cancel exit " << std::endl;
    }
}

int main()
{
    std::atomic_bool cancel_token = ATOMIC_VAR_INIT(false);
    std::vector<std::future<void>> tasklist;
    std::vector<std::tuple<int, std::atomic_bool, std::future<void>>> mTaskList;

    for (int i = 0; i < 3; i++)
    {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        tasklist.push_back(std::async(std::launch::async, doSomething, 1, 2, 250, std::ref(cancel_token)));
    }

    std::this_thread::sleep_for(std::chrono::seconds(4));
    cancel_token = true;

    for (int i = 0; i < tasklist.size(); i++)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        tasklist[i].get();
    }

    return 0;
}

我正在尝试使用 std::async 取消特定的工作线程表单生成线程列表。

期望,我的代码应该能够取消特定的工作线程而不是全部。 谢谢你

c++ multithreading c++17 stdasync std-future
1个回答
0
投票

你可以做这样的事情。通过这种方法,您不仅可以取消任务,还可以通知它做一些工作。而且,它比互斥锁+条件变量快一点。

#include <iostream>
#include <thread>
#include <chrono>
#include <memory>
#include <string>

using namespace std::chrono_literals;

class cancellable_task
{
public:

    virtual ~cancellable_task() = default;

    bool is_running() const
    {
        return _state->flag & 0x01;
    }

    void cancel()
    {
        _state->flag = 0x00;
        _state->flag.notify_one();
    }

    void notify()
    {
        _state->flag |= 0x10;
        _state->flag.notify_one();
    }

    void wait()
    {
        _state->flag.wait(0x01);
        _state->flag &= 0x01;
    }
    
    virtual void operator()() = 0;

protected:

    cancellable_task():
        _state{new shared_state()}
    {}
    
private:

    struct shared_state
    {
        std::atomic<unsigned short> flag{0x01};
    };

    std::shared_ptr<shared_state> _state;

};


struct task: public cancellable_task
{
    int counter{0};
    
    void operator()() override
    {
        while (is_running())
        {
            std::cout << "iteration: " << counter++ << std::endl;
            wait();
        }
        std::cout << "cancelled" << std::endl;
    }
};
   
    
int main()
{
    task my_task;
    std::thread t{my_task};

    std::this_thread::sleep_for(1s);
    my_task.notify();
    std::this_thread::sleep_for(1s);
    my_task.notify();
    std::this_thread::sleep_for(1s);
    my_task.cancel();
    
    t.join();
    return 0;
}

输出:

iteration: 0
iteration: 1
iteration: 2
cancelled
© www.soinside.com 2019 - 2024. All rights reserved.