防止记录器线程空转

问题描述 投票:0回答:1

我的程序在 4 个工作线程之间共享一个很长的文件列表(由用户提供)。每个工作人员都记录到一个简单的向量(logPool)。由于工人很忙,我选择让记录器不断旋转。这是连续版本:

struct logRec{ enum class type : unsigned { info = 0, error, end } typ;  /* end : marker for pool finalized, no-mo-msg */
               string msg; };
auto static const INFO = logRec::type::info, ERROR = logRec::type::error, END = logRec::type::end;
typedef vector<struct logRec> logPool;
vector<logPool> logPools;

//Print out log pools in order
void logger(){
  
  constexpr auto prLog = [](const size_t iPool, const size_t k) -> bool /*end reached ?*/ {
    switch(const auto& lrec = logPools[iPool][k]; lrec.typ){
      case INFO  : cout <<       lrec.msg      <<"\n"  <<std::flush; return false;
      case ERROR : cerr <<"\n"<< lrec.msg      <<"\n\n"<<std::flush; return false;
      [[unlikely]] case END : return true;
    }
    return true;
  };
  
  size_t last = 0, k = 0; bool end = false;
  for(auto i=0; i<logPools.size(); i++){ // sequential mode
    last = k = 0; end = false;
    while(!end){  // keep scanning pool #i until an end marker is reached
      for(k = last; k < logPools[i].size(); k++) if((end = prLog(i,k))) break;
      last = k;
    }
  }
}

没有与记录器线程同步:工作人员不知道它的存在,这使得代码非常简单。

另一方面,工作人员初始化/处理大文件/..所花费的所有时间都花在了记录器的旋转上(浪费资源)。

有更好的方法吗? (就像每 1000 条日志记录/300 毫秒左右向记录器发出信号......)

(在每个循环中访问所有日志池的非顺序版本不太可能空手而归,因此几乎不会空转)

c++ computer-science
1个回答
0
投票

这对我有用:通过条件变量发出信号

condition_variable logCV;
size_t logEvery = 1200; // every n files
auto logBreak = 1200ms; // scaled to work chunk size in main()
std::atomic<bool> threads_done{ false };

auto logger() noexcept -> void {
  
  constexpr auto prLog = [](const logRec& lrec) -> bool /*end reached ?*/ {
    switch(lrec.typ){
      case LOG_INFO  : cout <<       lrec.msg <<"\n"  ; return false;
      case LOG_ERROR : cerr <<"\n"<< lrec.msg <<"\n\n"; return false;
      [[unlikely]] case LOG_END : return true;
    }
    std::unreachable();
  };
  
  auto lk = unique_lock<mutex>( *(new mutex) );

  size_t sz = 0, last = 0, k = 0; bool end = false;
  for(auto i=0; i<logPools.size(); i++){ // sequential mode

    last = k = 0; end = false;
    while(!end){  // keep scanning pool #i until an end marker is reached
      last = k;
      if(not threads_done) logCV.wait_for(lk,logBreak); // coffee break
      // if threads are all done, you're behind buddy, speed through logPools, no waiting
      sz = logPools[i].size();
      if(sz==last and logPools[i][sz-1].typ == LOG_END) break;
      for(k = last; k < sz; k++) if( (end = prLog(logPools[i][k])) ) break;
      std::fflush(stdout); std::fflush(stderr);
    }

  }
}

void worker(){
  // std::lock_guard ; pick a work chunk, if any are left
  size_t done = 0;
  for(auto& file : workChunk) {
    // process file
    done++;
    if(done % logEvery == 0) logCV.notify_one();
  }
}

main(){
  ...
  // Run threads
  threads_done = true;
  ...
}

在多达 60,000 个工作项目上进行了测试,具有出色的性能。

© www.soinside.com 2019 - 2024. All rights reserved.