在 C++ 中的工作线程之间分割数据集计算

问题描述 投票:0回答:2

我有一个包含 1500 行的 2D 矢量数据集,我想对每一行执行计算量大的操作。我想利用多个线程来完成此任务,以便尽可能快地执行。

我在互联网上找不到任何关于如何解决此问题的明确解决方案。我想过为每一行创建一个线程,但这似乎效率很低,因为 1500 个线程会占用大量不必要的资源。因此,最好创建 6 个工作线程(我的电脑有 6 个核心)并分配工作。

//Computationally expensive function
double Loss(vector<double> input,vector<double> expectedOutput);

//This Calculates the Loss of all the 1500 rows in the 2D Vector and returns their average
double TotalLoss(vector<vector<double>> inputs,vector<vector<double>> expectedOutputs);

//TBD: Implement TotalLoss using multiple threads
double MultiThreadedTotalLoss(vector<vector<double>> inputs,vector<vector<double>> expectedOutputs);

我尝试使用 std::thread 来实现不同的线程,但不知道如何将工作分成 6 个。我想过制作 6 个不同的向量并划分原始向量,但不知道这是否是最好的可能接近。

如有任何帮助,我们将不胜感激。

c++ multithreading
2个回答
1
投票

鉴于您已经有一个计算一行损失的函数,我猜您的

TotalLoss
函数将如下所示:

using Matrix = std::vector<std::vector<double>>;

double TotalLoss(Matrix const &inputs, Matrix const &expected) {
    double ret = 0.0;

    for (int i=0; i<inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    }
    return ret;
}

假设这是相当准确的,多线程版本可能看起来像这样:

using Matrix = std::vector<std::vector<double>>;

double MultithreadedTotalLoss(Matrix const &inputs, Matrix const &expected) {
    double ret = 0.0;

    #pragma omp parallel for reduction(+:ret)
    for (int i=0; i<inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    }
    return ret;
}

#pragma 多一行,然后就可以了。就此而言,即使其中的

reduction(+:ret)
部分也是可选的,尽管它可以在很大程度上提高效率(它基本上告诉编译器/库保留每个线程累加器,然后在最后将它们加在一起,而不是让线程在运行时争夺对单个变量的访问)。如果
loss()
真的很贵,这可能不会有太大区别。

这比显式执行线程有一些优点。显而易见的是,与显式编写所有线程代码相比,它非常简单。不太明显,但通常同样重要的是,它可以/将自动查找(并使用)可用的核心数量,因此它将使用您拥有的六个核心,但如果您在一台具有 128 个核心的机器上运行它,它会自动使用所有这些。从长远来看,它还有一个优点,即继续保持所涉及的基本算法易于查找、易于阅读等。显式多线程很快就会被线程管理“东西”所主导,因此几乎很难找到真正起作用的代码。

最大的缺点是缺乏灵活性。对于这样的代码,OpenMP 可以很好地工作 - 但对于其他一些情况,应用起来要困难得多。


1
投票

您可以使用下面的线程池示例。它并不完美,但应该给你一个想法:-)

#include <iostream>
#include <thread>
#include <queue>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>

using namespace std::chrono_literals;

class ThreadPool
{
public:
    ThreadPool(int no_of_threads) : m_pool(no_of_threads)
    {
        for (int i = 0; i < no_of_threads; i++)
        {
            m_pool[i] = std::thread(&ThreadPool::thread_func, this, i);
            m_pool[i].detach();
        }
    }

    void add_task(std::function<void(int)> task_fn)
    {
        std::unique_lock<std::mutex> lck(m_mutex);
        m_task_queue.push(task_fn);
        m_cv.notify_all();
    }

    void stop_processing()
    {
        m_stop_all_threads = true;
    }
private:
    void thread_func(int thread_id)
    {
        while (!m_stop_all_threads)
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            // wait for some task to be added in queue
            if (!m_cv.wait_for(lck, 100us, [this]() { return !m_task_queue.empty(); }))
                continue;
            // pick up task, update queue
            auto fn = m_task_queue.front();
            m_task_queue.pop();
            lck.unlock();
            // execute task
            fn(thread_id);
            // std::this_thread::sleep_for(1us);
        }
    }

    std::vector<std::thread> m_pool;
    std::atomic<bool> m_stop_all_threads{false};
    std::mutex m_mutex;
    std::condition_variable m_cv;
    std::queue< std::function<void(int)> > m_task_queue;
};

int main()
{    
    ThreadPool pool(5);

    int i = 0;
    while (i < 100)
    {
        pool.add_task([x = i](int id) { std::cout << "This is task " << x << " in thread " << id << '\n'; });
        i++;
    }
    std::this_thread::sleep_for(5ms);
    pool.stop_processing();
}

目前,

m_task_queue
正在等待
std::function<void(int)>
。您可以将其更改为您需要的任何签名。

© www.soinside.com 2019 - 2024. All rights reserved.