使用优化编译时,多线程程序块

问题描述 投票:1回答:2

我正在开发一个项目,我必须在管道中进行模拟(任意)计算。管道由阶段组成,每个阶段接受来自前一阶段的输入(除了第一阶段,直接从管道对象接收任务),进行计算并将结果发送到下一阶段。每个阶段都使用单独的执行线程实现。

管道应该具有基本的负载平衡能力:如果(一段时间后)它识别出两个连续阶段的执行时间之和小于最慢阶段的执行时间,它会“折叠”这两个阶段,即它使得它们都使用单个线程顺序运行。

项目中有三个类:类Pipeline和Stage是显而易见的,而类TSOHeap(Thread-Safe Ordered heap)是每个阶段输入中使用的缓冲区。它具有最大大小,并且能够为特殊消息提供最高优先级,以指示必须折叠舞台。

我的问题是:为什么如果我编译没有优化代码运行平稳(或至少不阻止),而如果我编译优化(-O2, -O3)程序块?如果我用调试器运行程序,它会阻塞几次;如果我从终端“正常”运行程序,它几乎总是阻塞。奇怪的是,一个线程阻塞在一条简单的打印线上。在我添加该打印之前(出于调试目的),程序在前一行上被阻塞,这是一个while循环的保护。

我猜这个问题与线程之间的同步有关,但我不知道如何发现故障部分。唯一的常量是程序在调用方法collapse_next_stage()之后阻塞,即在线程停止之后。

任何建议都会受到赞赏,甚至是发现这些错误的一般程序。我报告代码运行一个例子:

“TSOHeap.hpp”类:

#include <mutex>
#include <queue>
#include <vector>
#include <atomic>
#include <climits>
using namespace std;

template<typename T>
struct Comparator{
    bool operator()(pair<T,int> p1, pair<T,int> p2){
    return p1.second > p2.second;
    }
};

//Thread-Safe Ordered Heap
template<typename T>
struct TSOHeap
{
    TSOHeap(int _max=10):size{0},max{_max}{};
    ~TSOHeap(){}

    void push(T* item, int id){
        while(size==max);
        {
            lock_guard<mutex> lock(heap_mutex);
            heap.push(pair<T*,int>(item, id));
            size++;
        }        
    }

    pair<T*,int> pop(){
        while(size==0);
        {
            lock_guard<mutex> lock(heap_mutex);
            pair<T*,int> p = heap.top();
            heap.pop();
            size--;
            return p;
        }
    }

    priority_queue<pair<T*,int>, vector<pair<T*,int>>,Comparator<T*>> heap;
    atomic<int> size;
    int max;
    mutex heap_mutex;    
};

“Stage.hpp”类:

#include "TSOHeap.hpp"
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <mutex>
using namespace std;;

struct IStage{
    virtual void run() = 0; 
    virtual void wait_end() = 0;
    virtual void stage_func() = 0;
    virtual double get_exec_time() = 0;
    virtual void reset_exec_time()=0; 
    virtual void add_next(IStage&)=0;   
    virtual IStage* get_next() = 0;
    virtual void* get_input_ptr() = 0; 
    virtual void set_input(void*) = 0;
    virtual void collapse() = 0;
    virtual bool is_collapsed() = 0;
    virtual void collapse_next_stage() = 0;
    virtual int num_collapsed() = 0;
    ~IStage(){};   
};

template <typename Tin, typename Tf, typename Tout>
struct Stage : IStage{

Stage(Tf  function, int ind):fun{function}, input_ptr{new(TSOHeap<Tin>)},_end{false},
     next{nullptr}, collapsed{0}, i{ind}, exec_time{0.0},count{0},collapsing{false},c{0}{}; 

~Stage(){delete input_ptr;}

void stage_func(){ 
    Tin * input = input_ptr->pop().first;
    if (input!=nullptr){
        auto start = chrono::system_clock::now();   
        Tout out = fun(*input);  
        auto end = chrono::system_clock::now();
        chrono::duration<double> diff = end-start;
        set_exec_time(diff.count()); 
        if (next!=nullptr)
            next->set_input(new Tout(out));           
    }
    else
        _end = true;   
}

void run_thread(){
    while(!_end){
    cout << "t " << i << ", r " << ++c << endl; // BLOCKS HERE
        while(collapsing); //waiting that next stage finishes the remaining tasks
        stage_func();
        if(collapsed==1 && !_end)         
            next->stage_func();                         
    } 
    if(collapsed!=-1){  
        IStage * nptr = next;
        if(nptr!=nullptr && nptr->is_collapsed())     
            nptr = nptr->get_next();        
        if(nptr!=nullptr)
            nptr->set_input(nullptr);
    }
    else{
        while((input_ptr->size)>0)
            stage_func();
    }
}

void run()
{ 
    thread _t(&Stage::run_thread, this);
    t = move(_t);
    return;
}

void wait_end()
{
    t.join();
}

void set_input(void * iptr)
{
    input_ptr->push(static_cast<Tin*>(iptr), ++count);
}

void* get_input_ptr()
{
    return input_ptr;
}

void add_next(IStage &n)
{
    next = &n;
    output_ptr = static_cast<TSOHeap<Tout>*>(n.get_input_ptr());      
}

void collapse()
{       
    collapsed=-1;
    input_ptr->push(nullptr, INT_MIN);
// First condition is to avoid deadlock, in case this thread finished the execution in the meanwhile
    while(!_end && (input_ptr->size) > 0);      
}

bool is_collapsed()
{
    return collapsed==-1;
}

void collapse_next_stage()
{ 
    collapsing = true;  
    next->collapse();
    collapsed++;
    collapsing = false; 
    cout << "Stage # " << i << " has collapsed the successive Stage" << endl;
}

IStage* get_next()
{
    return next;
}

double get_exec_time()
{
    return exec_time;
}

void reset_exec_time()
{
    set_exec_time(0.0);
}

void set_exec_time(double value)
{
    lock_guard<mutex> lock(et_mutex);
    exec_time = value;
}

int num_collapsed()
{
    return collapsed;
}

Tf fun;   
TSOHeap<Tin> * input_ptr;
bool _end; 
IStage * next;
int collapsed;
int const i; 
double exec_time;
int count;
mutex et_mutex;  
bool collapsing;
int c;
TSOHeap<Tout> * output_ptr;
thread t; 
};

“Pipe.hpp”类:

#include "Stage.hpp"
#include <list>
#include <thread>
#include <algorithm>
using namespace std;;

template <typename Tin, typename Tout>
struct Pipe{

    Pipe(list<IStage*>li, int n_samples=10):slowest{-1},end{false},num_samples{n_samples}
    {
        for(auto& s:li)
            add_node(s);
    }

    void add_node(IStage* sptr)
    {
    if(!nodes.empty()) 
        nodes.back()->add_next(*sptr); 
    nodes.push_back(sptr);
    }

    void set_input(void * in_ptr)
    {
        nodes.front()->set_input(in_ptr);
    }

    int num_nodes()
    {
        return nodes.size();
    }

    void run()
    {
        for(auto &x: nodes)
        x->run();
    }

    void run(list<Tin>&& input)
    {
        thread t(&Pipe::run_manager, this, ref(input));
        while(!end)
            monitor_times();
        t.join();
    }

    void run_manager(list<Tin>& input)
    {
       run();
       for(auto& x:input)
           set_input(&x);   
       set_input(nullptr);
       end=true;
       for(auto& s : nodes)
           s->wait_end();
    } 

    void monitor_times()
    { // initialization phase
        vector<int> count;
        vector<double> avg; 
        vector<priority_queue<pair<double,int>, vector<pair<double,int>>,Comparator<double>>> measures;
        for(auto& x : nodes){
            count.push_back(0);
            avg.push_back(0);
            measures.push_back(priority_queue<pair<double,int>, 
                vector<pair<double,int>>,Comparator<double>>());
        }
        while(!end){
        // monitoring phase 
            for(int i=0; i<nodes.size(); i++){
                if(nodes[i]->get_exec_time()!=0){ 
                    pair<double,int> measure = pair<double,int>(nodes[i]->get_exec_time(),++count[i]);
                    nodes[i]->reset_exec_time();
                    measures[i].push(measure);
                    if(count[i]<=num_samples){
                        avg[i] = (avg[i]*(count[i]-1) + measure.first) / count[i]; 
                    }
                    else
                    {
                        double old = measures[i].top().first; 
                        // the ordering of the heap guarantees that I drop the oldest measure   
                        measures[i].pop(); 
                        avg[i] = (avg[i] * num_samples - old + measure.first) / num_samples;
                    }
                }       
            }
        // updating phase
            if(is_steady_state(count)){
                int slowest = get_slowest_stage(avg);
                for(int i=0; i<nodes.size()-1; i++){
                    if(avg[i]+avg[i+1]<avg[slowest]){
                        if(nodes[i]->num_collapsed()==0 && nodes[i+1]->num_collapsed()==0){
                            nodes[i]->collapse_next_stage();
                            break;    
                        }           
                    }       
                }       
            }
        }   
    }

    bool is_steady_state(vector<int>& count){
        for(auto& c: count){
            if(c < num_samples) return false;
        }
        return true;    
    }

    int get_slowest_stage(vector<double>& avg){
        double max = 0.0;
        int index = -1;
        for(int i=0; i<avg.size(); i++){
            if(avg[i]>max){
                max=avg[i];
                index = i;
            }
        }
        return index;
    }

    int slowest;
    bool end;
    int num_samples;
    vector<IStage*> nodes;
};  

类“main.cpp”:

#include<iostream>
#include<functional>
#include <chrono>
#include<cmath>
#include "Pipe.hpp"
using namespace std;;

auto f = [](int x){
    int c = 0;
    for(int i=0; i<300; i++)
        c=sin(i);
    return x;    
};

auto fast = [] (int x) {return x;};

auto fast_init = [](int x){
    if(x < 5)
        return x;

    int c=0;
    for(int i=0; i<300; i++)
        c=sin(i);    
    return x;    
};

auto print = [] (int x) {
    cout << "Result: " << x << " " << endl; 
    return x; 
};

int main(int argc, char* argv[])
{
    auto print_usage_msg = [&](){
    cout << "Usage: " << argv[0] << " <func_type> \n" <<
    "<func_type> = \n"
    " 0      to have 2 consecutive stages running the fast function\n"
    " 1      to have 2 consecutive stages running the fast function "
    "but after a short time reaching steady state " << endl;
    };

    if(argc!=2){
    print_usage_msg();
    return 1;
    }   

    int fun_code = atoi(argv[1]);

    if (fun_code!=0 && fun_code!=1){
        print_usage_msg();
    return 1;
    }   

    Stage<int,function<int(int)>,int> s1{f,1};
    Stage<int,function<int(int)>,int> s2{f,2};
    Stage<int,function<int(int)>,int> s3{f,3}; 
    Stage<int,function<int(int)>,int> s4{f,4};
    Stage<int,function<int(int)>,int> s5{f,5};
    Stage<int,function<int(int)>,int> s6{f,6};
    Stage<int,function<int(int)>,int> s7{f,7};
    Stage<int,function<int(int)>,int> sp{print,8};

    if(fun_code==0){
    s2.fun = fast;
    s3.fun = fast; 
    }
    else{
    s2.fun = fast_init;
    s3.fun = fast_init;
    }

    Pipe<int,int> p ({&s1, &s2, &s3, &s4, &s5, &s6, &s7, &sp});
    cout << "Pipe length: " << p.num_nodes() << endl;
    list<int> li {};
    for(int i=0; i<100; i++)
        li.push_back(i);
    p.run(move(li));
    return 0;
}

编译:

g++ main.cpp -std=c++11 -pthread -O3 -o gpipe -g 

运行:

./gpipe 1

谢谢你的帮助!

c++ multithreading optimization parallel-processing
2个回答
1
投票

想象一下单线程程序的以下代码:

void func()
{
    bool a = true;
    while(a)
    {
        // busy wait...
    }
}

这个功能会回归吗?显然不是。如果您是编译器,您将如何为此编写优化代码?

1: NOP
2: GOTO 1

这正是您使用这段代码所做的。两次。

while(!_end){  // here #1
    cout << "t " << i << ", r " << ++c << endl;
    while(collapsing)  // here #2
        ;  // for the love of God, move your semicolon here or use braces
    stage_func();
    if(collapsed==1 && !_end)         
        next->stage_func();                         
} 

您的编译器绝对没有义务意识到您正在进行多线程编程。 (这是你的工作告诉它)

编译器需要知道不要对_endcollapsed执行优化。不要使用volatile。为什么? volatile将阻止编译器优化变量,但是......嘿嘿...... CPU还可以从不同的线程中优化掉你对_endcollapsed的写入(通过将它们保存在缓存中而不是写入主内存)。编译器和CPU也将重新排序您的指令,这可能会导致类似的问题。

内存栅栏(也称为内存屏障)可用于指示CPU执行诸如推出挂起写入或重新更新其缓存值以进行读取之类的操作。它们还为命令重新排序提供指导。 AFAIK std :: atomic_thread_fence将阻止编译器重新排序,但我已经阅读了有关此问题的相互矛盾的事情......

到目前为止,最简单,最实用,最容易证明正确的事情就是将所有线程间通信变量切换为std :: atomic <>类型,这些类型包含内存屏障。所以

std::atomic<bool> _end; 
std::atomic<int> collapsed;

作为一般规则,线程之间共享的任何数据都应该由互斥锁保护,或者如果竞争条件不是问题(正如您使用简单信令那样),则应该是std :: atomic <>。如果你真的知道自己在做什么,并且真正了解架构,编译器和标准实现,那么你可以打破这个规则,但即使是专家也是如此。

顺便说一句,互斥锁的锁定和解锁操作都包含一个内存屏障,以防你担心。所以当你从TSOHeap得到一个指针时,那很好(假设你的TSOHeap实现是正确的......我没看过它)。


1
投票

使用TSOHeap时,你在size有种族条件。虽然size是原子的,但它是较大状态的一部分而非原子状态,因此size的变化与状态其余部分的变化不同步。

使size非原子化并仅在互斥锁被锁定时访问它。添加条件变量以通知在pushpop中等待的线程。

或者,完全删除size。例:

template<typename T>
struct TSOHeap
{
    TSOHeap(size_t _max=10): max{_max}{}

    void push(T* item, int id){
        unique_lock<mutex> lock(heap_mutex);
        while(heap.size() == max)
            cnd_pop.wait(lock);
        heap.push(pair<T*,int>(item, id));
        cnd_push.notify_one();
    }

    pair<T*,int> pop() {
        pair<T*,int> result = {};
        {
            unique_lock<mutex> lock(heap_mutex);
            while(heap.empty())
                cnd_push.wait(lock);
            bool notify = heap.size() == max;
            result = heap.top();
            heap.pop();
            if(notify)
                cnd_pop.notify_one();
        }
        return result;
    }

    mutex heap_mutex;
    condition_variable cnd_push, cnd_pop;
    priority_queue<pair<T*,int>, vector<pair<T*,int>>,Comparator<T*>> heap;
    size_t const max;
};
© www.soinside.com 2019 - 2024. All rights reserved.