C ++双端队列使用者总是从生产者那里得到空队列

问题描述 投票:0回答:1

在一个非常简单的消费者/生产者测试中,我的消费者总是得到一个空队列。而且我不知道哪里错了。这是一个非常简单的测试程序,所以我希望有敏锐眼神的人可以在这里给我一些帮助。

#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;

void updateLog() {
    std::lock_guard<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
}

void produce() {
    srand (time(NULL));

    // produce every 5ms

    timespec ts = {0, 5*1000000};

    //
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
            nanosleep(&ts, NULL);
        }
    });
    printf("log thread created.");
}

void consume() {

    // consume every 10ms

    timespec ts = {0, 10*1000000};

    //

    while (true) {
        std::lock_guard<std::mutex> lock(gMutex);
        std::string log;
        unsigned int N = gMsgs.size();

        // consume all data in queue at the moment

        for (unsigned int m = 0; m < N; ++m) {
            log += gMsgs[m]+"\n";
        }

        // remove already consumed data

        for(unsigned int m=0; m<N; ++m) {
            gMsgs.pop_front();
        }

        if (log.empty()) {
            log = "EMPTY";
        }

        printf("log: %s\n", log.c_str());
        nanosleep(&ts, NULL);
    }
}

int main()
{
  produce();
  consume();
}


因此,我的生产者在后台线程中运行,并以更快的速度将新的字符串推入队列。我在主线程中的使用者以较低的速率不断将数据移出队列。

期望

我希望队列不应该是空的,并且消费者随时都应该吃点东西。

已观察

打印输出始终为“ EMPTY”,这意味着队列中没有任何内容。

怎么了?

更新

我从@john那里得到建议,并将nanosleep移出了锁止装置,但结果几乎相同,请参阅更新的代码。

// Example program

#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::thread gThread;

void updateLog() {
    std::lock_guard<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
}

void produce() {
    srand (time(NULL));

    int rnd = -1;
    timespec ts = {0, 5*1000000};
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
            nanosleep(&ts, NULL);
        }
    });
    printf("log thread created.\n");
}

void do_consume() {
    std::lock_guard<std::mutex> lock(gMutex);
    std::string log;
    unsigned int N = gMsgs.size();
    for (unsigned int m = 0; m < N; ++m) {
        log += gMsgs[m]+"\n";
    }
    for(unsigned int m=0; m<N; ++m) {
        gMsgs.pop_front();
    }

    if (log.empty()) {
        log = "EMPTY";
    }
    printf("log: %s\n", log.c_str());
}

void consume() {
    timespec ts = {0, 10*1000000};
    while (true) {
        do_consume();
        nanosleep(&ts, NULL);
    }
}

int main()
{
  produce();
  consume();
}

和结果

log thread created.
log: EMPTY
log: hello: 2

log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
log: EMPTY
c++ multithreading producer-consumer deque
1个回答
0
投票

只是想回应@ user4581301和@PaulMcKenzie的建议,确实是std::contional_variable是要走的路。我删除了nanosleep()通话。

更新的代码:

// Example program
#include <condition_variable>
#include <string>
#include <cstdio>
#include <cstdlib>     /* srand, rand */
#include <ctime>
#include <deque>
#include <mutex>
#include <thread>

std::deque<std::string> gMsgs;
std::mutex gMutex;
std::condition_variable gNotify;
bool gConsumerCanConsume = false;
std::thread gThread;

void updateLog() {
    std::unique_lock<std::mutex> lock(gMutex);
    char msg[256];
    int rnd = rand()%100 + 1;
    sprintf(msg, "hello: %d", rnd);
    gMsgs.push_back(std::string(msg));
    gConsumerCanConsume = true;
    lock.unlock();
    gNotify.notify_one();
}

void produce() {
    srand (time(NULL));

    timespec ts = {0, 5*1000000};
    gThread = std::thread([&]() {
        while(true) {
            updateLog();
        }
    });
    printf("log thread created.\n");
}

void do_consume() {
    {
        std::unique_lock<std::mutex> lock(gMutex);
        gNotify.wait(lock, []{return gConsumerCanConsume;});
    }

    std::string log;
    unsigned int N = gMsgs.size();
    for (unsigned int m = 0; m < N; ++m) {
        log += gMsgs[m]+"\n";
    }
    for(unsigned int m=0; m<N; ++m) {
        gMsgs.pop_front();
    }
    if (log.empty()) {
        log = "EMPTY";
    }
    printf("log: %s\n", log.c_str());
}

void consume() {
    timespec ts = {0, 10*1000000};
    while (true) {
        do_consume();
    }
}

int main()
{
  produce();
  consume();
}

和结果


log thread created.
log: hello: 56
hello: 30
hello: 58
hello: 16
hello: 73
hello: 33
hello: 92
hello: 20
hello: 80
hello: 34
hello: 80
hello: 54
hello: 68
hello: 20
hello: 86
hello: 67

...
© www.soinside.com 2019 - 2024. All rights reserved.