在一个非常简单的消费者/生产者测试中,我的消费者总是得到一个空队列。而且我不知道哪里错了。这是一个非常简单的测试程序,所以我希望有敏锐眼神的人可以在这里给我一些帮助。
#include <string>
#include <cstdio>
#include <cstdlib> /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>
std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;
void updateLog() {
std::lock_guard<std::mutex> lock(gMutex);
char msg[256];
int rnd = rand()%100 + 1;
sprintf(msg, "hello: %d", rnd);
gMsgs.push_back(std::string(msg));
}
void produce() {
srand (time(NULL));
// produce every 5ms
timespec ts = {0, 5*1000000};
//
gThread = std::thread([&]() {
while(true) {
updateLog();
nanosleep(&ts, NULL);
}
});
printf("log thread created.");
}
void consume() {
// consume every 10ms
timespec ts = {0, 10*1000000};
//
while (true) {
std::lock_guard<std::mutex> lock(gMutex);
std::string log;
unsigned int N = gMsgs.size();
// consume all data in queue at the moment
for (unsigned int m = 0; m < N; ++m) {
log += gMsgs[m]+"\n";
}
// remove already consumed data
for(unsigned int m=0; m<N; ++m) {
gMsgs.pop_front();
}
if (log.empty()) {
log = "EMPTY";
}
printf("log: %s\n", log.c_str());
nanosleep(&ts, NULL);
}
}
int main()
{
produce();
consume();
}
因此,我的生产者在后台线程中运行,并以更快的速度将新的字符串推入队列。我在主线程中的使用者以较低的速率不断将数据移出队列。
我希望队列不应该是空的,并且消费者随时都应该吃点东西。
打印输出始终为“ EMPTY”,这意味着队列中没有任何内容。
怎么了?
我从@john那里得到建议,并将nanosleep
移出了锁止装置,但结果几乎相同,请参阅更新的代码。
// Example program
#include <string>
#include <cstdio>
#include <cstdlib> /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>
std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;
void updateLog() {
std::lock_guard<std::mutex> lock(gMutex);
char msg[256];
int rnd = rand()%100 + 1;
sprintf(msg, "hello: %d", rnd);
gMsgs.push_back(std::string(msg));
}
void produce() {
srand (time(NULL));
int rnd = -1;
timespec ts = {0, 5*1000000};
gThread = std::thread([&]() {
while(true) {
updateLog();
nanosleep(&ts, NULL);
}
});
printf("log thread created.\n");
}
void do_consume() {
std::lock_guard<std::mutex> lock(gMutex);
std::string log;
unsigned int N = gMsgs.size();
for (unsigned int m = 0; m < N; ++m) {
log += gMsgs[m]+"\n";
}
for(unsigned int m=0; m<N; ++m) {
gMsgs.pop_front();
}
if (log.empty()) {
log = "EMPTY";
}
printf("log: %s\n", log.c_str());
}
void consume() {
timespec ts = {0, 10*1000000};
while (true) {
do_consume();
nanosleep(&ts, NULL);
}
}
int main()
{
produce();
consume();
}
和结果
log thread created.
log: EMPTY
log: hello: 2
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
只是想回应@ user4581301和@PaulMcKenzie的建议,确实是std::contional_variable
是要走的路。我删除了nanosleep()
通话。
更新的代码:
// Example program
#include <condition_variable>
#include <string>
#include <cstdio>
#include <cstdlib> /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>
std::deque<std::string> gMsgs;
std::mutex gMutex;
std::condition_variable gNotify;
bool gConsumerCanConsume = false;
std::thread gThread;
void updateLog() {
std::unique_lock<std::mutex> lock(gMutex);
char msg[256];
int rnd = rand()%100 + 1;
sprintf(msg, "hello: %d", rnd);
gMsgs.push_back(std::string(msg));
gConsumerCanConsume = true;
lock.unlock();
gNotify.notify_one();
}
void produce() {
srand (time(NULL));
timespec ts = {0, 5*1000000};
gThread = std::thread([&]() {
while(true) {
updateLog();
}
});
printf("log thread created.\n");
}
void do_consume() {
{
std::unique_lock<std::mutex> lock(gMutex);
gNotify.wait(lock, []{return gConsumerCanConsume;});
}
std::string log;
unsigned int N = gMsgs.size();
for (unsigned int m = 0; m < N; ++m) {
log += gMsgs[m]+"\n";
}
for(unsigned int m=0; m<N; ++m) {
gMsgs.pop_front();
}
if (log.empty()) {
log = "EMPTY";
}
printf("log: %s\n", log.c_str());
}
void consume() {
timespec ts = {0, 10*1000000};
while (true) {
do_consume();
}
}
int main()
{
produce();
consume();
}
和结果
log thread created.
log: hello: 56
hello: 30
hello: 58
hello: 16
hello: 73
hello: 33
hello: 92
hello: 20
hello: 80
hello: 34
hello: 80
hello: 54
hello: 68
hello: 20
hello: 86
hello: 67
...