在SPSC框架中condition_variable.notify_one()不一致地发信号

问题描述 投票:0回答:1

生产者,在每个进入队列后,通过conditionVar.notify_one()向消费者发出信号。

但是,消费者在经过一定次数的随机推送(因此随后的notify_one()s,有时21有时2198,等等)之后醒来。在生产者中插入delay(sleep_for()或yield())无济于事。

如何使此SPSC锁定运行?

如何将此示例扩展到多个使用者(即SPMC)?

void singleProducerSingleConsumer() {
    std::condition_variable conditionVar;
    std::mutex mtx;
    std::queue<int64_t> messageQueue;
    bool stopped = false;
    const std::size_t workSize = 4096;

    std::function<void(int64_t)> producerLambda = [&](int64_t id) {
        // Prepare a random number generator and push to the queue
        std::default_random_engine randomNumberGen{};
        std::uniform_int_distribution<int64_t> uniformDistribution{};

        for (auto count = 0; count < workSize; count++){
            //Always lock before changing state guarded by a mutex and condition_variable "conditionVar"
            std::lock_guard<std::mutex> lockGuard{ mtx };

            //Push a random number onto the queue
            messageQueue.push(uniformDistribution(randomNumberGen));

            //Notify the consumer
            conditionVar.notify_one();
            //std::this_thread::yield();
            /*std::this_thread::sleep_for(std::chrono::seconds(2));
            std::cout << "Producer woke " << std::endl;*/
        }
        //Production finished
        //Acquire the lock, set the stopped flag, inform the consumer
        std::lock_guard<std::mutex> lockGuard {mtx };

        std::cout << "Producer is done!" << std::endl;

        stopped = true;
        conditionVar.notify_one();
    };

    std::function<void(int64_t)> consumerLambda = [&](int64_t id) {

        do {
            std::unique_lock<std::mutex> uniqueLock{ mtx };
            //Acquire the lock only if stopped or the queue isn't empty
            conditionVar.wait(uniqueLock, [&]() {return stopped || !messageQueue.empty(); });

            //This thread owns the mutex here; pop the queue until it is empty
            std::cout << "Consumer received " << messageQueue.size() << " items" << std::endl;
            while (!messageQueue.empty()) {
                const auto val = messageQueue.front(); messageQueue.pop();
                std::cout << "Consumer obtained: " << val << std::endl;
            }
            uniqueLock.unlock();

            if (stopped) {
                //Producer has signaled a stop
                std::cout << "Consumer is done!" << std::endl;
                break;
            }

        } while (true);
    };

    std::thread consumer{ consumerLambda, 1 };
    std::thread producer{ producerLambda, 2 };
    consumer.join();
    producer.join();

    std::cout << "singleProducerSingleConsumer() finished" << std::endl;
}
multithreading mutex producer-consumer condition-variable thread-synchronization
1个回答
0
投票

如果我理解正确,那么您希望在生产者产生下一个数字之前先消耗生产者产生的每个数字。从根本上讲,这是顺序执行,而不是并发执行。您可以通过简单地让生产者在常规函数调用中将值传递给消费者来最有效地完成顺序执行。结果将不是生产者消费者模式的好例子。

线程是由操作系统安排的,与计算机上同时执行的所有其他线程和进程竞争。生产者线程和使用者线程可能在同一CPU内核上运行,这意味着它们必须轮流执行操作系统计划的时间。由于使用者直到使用生产者写入数据之前都无法使用数据,因此只有在使用者有时间使用messageQueue中的值之前,生产者将在其第一个执行窗口中用几个值填充messageQueue才有意义。因此,messageQueue将被批量填充和卸载,直到程序完成。

您的解决方案应扩展为处理多个消费者。

© www.soinside.com 2019 - 2024. All rights reserved.