我的程序在 4 个工作线程之间共享一个很长的文件列表(由用户提供)。每个工作人员都记录到一个简单的向量(logPool)。由于工人很忙,我选择让记录器不断旋转。这是连续版本:
struct logRec{ enum class type : unsigned { info = 0, error, end } typ; /* end : marker for pool finalized, no-mo-msg */
string msg; };
auto static const INFO = logRec::type::info, ERROR = logRec::type::error, END = logRec::type::end;
typedef vector<struct logRec> logPool;
vector<logPool> logPools;
//Print out log pools in order
void logger(){
constexpr auto prLog = [](const size_t iPool, const size_t k) -> bool /*end reached ?*/ {
switch(const auto& lrec = logPools[iPool][k]; lrec.typ){
case INFO : cout << lrec.msg <<"\n" <<std::flush; return false;
case ERROR : cerr <<"\n"<< lrec.msg <<"\n\n"<<std::flush; return false;
[[unlikely]] case END : return true;
}
return true;
};
size_t last = 0, k = 0; bool end = false;
for(auto i=0; i<logPools.size(); i++){ // sequential mode
last = k = 0; end = false;
while(!end){ // keep scanning pool #i until an end marker is reached
for(k = last; k < logPools[i].size(); k++) if((end = prLog(i,k))) break;
last = k;
}
}
}
没有与记录器线程同步:工作人员不知道它的存在,这使得代码非常简单。
另一方面,工作人员初始化/处理大文件/..所花费的所有时间都花在了记录器的旋转上(浪费资源)。
有更好的方法吗? (就像每 1000 条日志记录/300 毫秒左右向记录器发出信号......)
(在每个循环中访问所有日志池的非顺序版本不太可能空手而归,因此几乎不会空转)
这对我有用:通过条件变量发出信号
condition_variable logCV;
size_t logEvery = 1200; // every n files
auto logBreak = 1200ms; // scaled to work chunk size in main()
std::atomic<bool> threads_done{ false };
auto logger() noexcept -> void {
constexpr auto prLog = [](const logRec& lrec) -> bool /*end reached ?*/ {
switch(lrec.typ){
case LOG_INFO : cout << lrec.msg <<"\n" ; return false;
case LOG_ERROR : cerr <<"\n"<< lrec.msg <<"\n\n"; return false;
[[unlikely]] case LOG_END : return true;
}
std::unreachable();
};
auto lk = unique_lock<mutex>( *(new mutex) );
size_t sz = 0, last = 0, k = 0; bool end = false;
for(auto i=0; i<logPools.size(); i++){ // sequential mode
last = k = 0; end = false;
while(!end){ // keep scanning pool #i until an end marker is reached
last = k;
if(not threads_done) logCV.wait_for(lk,logBreak); // coffee break
// if threads are all done, you're behind buddy, speed through logPools, no waiting
sz = logPools[i].size();
if(sz==last and logPools[i][sz-1].typ == LOG_END) break;
for(k = last; k < sz; k++) if( (end = prLog(logPools[i][k])) ) break;
std::fflush(stdout); std::fflush(stderr);
}
}
}
void worker(){
// std::lock_guard ; pick a work chunk, if any are left
size_t done = 0;
for(auto& file : workChunk) {
// process file
done++;
if(done % logEvery == 0) logCV.notify_one();
}
}
main(){
...
// Run threads
threads_done = true;
...
}
在多达 60,000 个工作项目上进行了测试,具有出色的性能。