以非阻塞方式将数据从事件线程传递到持续运行的线程的最高效/优雅/稳健的解决方案 (C++20)

问题描述 投票:0回答:0

我正在进行一个研究项目,本质上是一个以 30 fps 显示一系列图像的 Windows 窗口。我最初一直在使用 GLFW。但是,API 的问题(尽管非常方便)是当用户通过单击并拖动窗口标题栏来移动窗口时,或者当用户将鼠标按住窗口的右下角时(调整手柄)不移动鼠标,窗口的内容不会刷新(由于 Windows 的模态性质,如果我做对了,它不会释放对这种特殊情况的关注)。

所以我需要创建两个线程:一个用于处理循环(处理图像并将它们每 1/30 秒显示到屏幕上)和一个处理事件(鼠标)。因为在这个项目中,我不能使用 GLFW 回调函数来捕获事件(我的约束),我需要使用本机 Windows API。但是为了简化代码,我将只是伪造事件的创建。这个想法是在主线程中捕获事件,将它们推送到队列中。然后负责处理和显示图像的线程获取事件队列,取消排队(并处理)事件,然后为图像做它的事情,等待 1/30 秒(减去做这些事情所花费的所有时间上面),继续前进。

下面的代码似乎有效。

  • 每次触发事件时,都会调用
    handle_event
    函数。它锁定互斥锁,并将事件添加到
    event_queue
    。当我们从函数返回时,互斥锁被解锁。
  • 处理线程,获取锁,事件出队列,处理事件等
#include <iostream>

#include <queue>
#include <chrono>
#include <mutex>
#include <condition_variable>

using namespace std::chrono_literals;

std::mutex m;

struct Event
{
    int msg;
};

std::queue<Event> event_queue;

void handle_event()
{
    std::cerr << "new event " << std::endl;
    std::lock_guard lock(m);
    event_queue.push({0});
}

using one_cycle = std::chrono::duration<std::int64_t, std::ratio<1, 30>>;
auto time_point = std::chrono::steady_clock::now() + one_cycle{0};

void constantly_running_proc()
{
    while (1) {
        {
            std::lock_guard lock(m);
            std::cerr << "event queue size: " << event_queue.size() << std::endl;
            while (!event_queue.empty()) {
                Event e = event_queue.back();
                event_queue.pop();
                //do_something_with_event(e);
            }
        }
        
        // process the events
        // ...

        // process the current frame
        // ...

        time_point += one_cycle{1};
        std::this_thread::sleep_until(time_point);
        
        //display_frame();
    }
}

int main() 
{
    std::thread t(constantly_running_proc);
    while (1) {
        handle_event();
        std::this_thread::sleep_for(100ms * rand() / (float)RAND_MAX);
    }

    return 0;
}
  1. 首先,这对您来说是否正确?你看到这段代码有什么问题吗?

  2. 虽然它在实践中似乎有效,但

    constantly_running_proc
    总是获取互斥锁以线程安全的方式访问事件队列,即使队列为空也是一种浪费。这可能没什么大不了的。但是因为对于这个练习,我试图尽可能地提高语言允许的效率(它需要在 1/30 秒内说出来,并且图像的处理可能很繁重 - 节省周期可能是一件好事在这种情况下。我也在学习。我一直在思考和尝试使用
    condition_variable
    。我试过这个:

std::condition_variable cv;

void handle_event()
{
    std::cerr << "new event " << std::endl;
    std::lock_guard lock(m);
    event_queue.push({0});
    cv.notify_one();
}

void constantly_running_proc2()
{
    while (1) {
        
        // process the current frame
        // ...

        time_point += one_cycle{1};
        std::unique_lock lock(m);
        if (cv.wait_until(lock, time_point, []() { return !event_queue.empty(); })) {
            std::cerr << "event queue size: " << event_queue.size() << std::endl;
            while (!event_queue.empty()) {
                Event e = event_queue.back();
                event_queue.pop();
                //do_something_with_event(e);
            }
            std::this_thread::sleep_until(time_point);
        }
        else {
            // timeout - nothing to do
        }
        
        //display_frame();
    }
}

我想这也行,但是在

std::this_thread::sleep_until
块内添加另一个
if (cv.wait_until)
似乎有点矫枉过正。

非常感谢向 C++ 专家/大师学习这个问题(我希望将来会有很多人愿意找到答案)。

编辑1

非常感谢@HansPassant(他拥有近 100 万的声誉;我什至认为这不可能存在))在评论中建议生产者-消费者模型(带有指向 wiki 页面的链接)。

Wiki 生产者-消费者模型

虽然我实现了 Wiki 页面上给出的模型的第一个版本,但我需要一个不同的实现。原因如下:

当消费者“跑得和生产者一样快”时,这个模型就起作用了。有时,生产者很忙,消费者用事件填满了缓冲区;有时,生产者并没有做太多事情,因此消费者有时间消费数据(并且缓冲区被清空)。一切都很好。

然而,在我的例子中,生产者只能每 1/30 秒“消费”一次数据。因此,如果生产者在短时间内产生大量事件,缓冲区将始终满,消费者将只有时间一次(每个周期一次)删除一个事件。

这里是演示这个的代码(取消注释

sleep_until
中的
constantly_running_proc
,看看当消费者“慢”时会发生什么):

#include <semaphore>
#include <thread>
#include <mutex>

#include <atomic>

#include <queue>
#include <iostream>

#include <chrono>

using namespace std::chrono_literals;

constexpr uint32_t N = 32; // max numb. of events that can be queued / and unqueued at once
std::counting_semaphore<N> number_of_queued_positions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manip;

struct Event
{
    int msg;
};

std::queue<Event> event_queue;

using frame = std::chrono::duration<std::int64_t, std::ratio<1, 30>>;
auto time_point = std::chrono::steady_clock::now() + frame{0};

std::atomic<bool> keep_running = true;

// consumer
void constantly_running_proc()
{
    while (keep_running) {
        // do some work that's roughly somewhere between 10 to 15ms
        std::this_thread::sleep_for(10ms + 5ms * rand() / (float)RAND_MAX);
        
        number_of_queued_positions.acquire(); // decr.
        {
            std::lock_guard lock(buffer_manip);
            std::cerr << "Num events (before): " << event_queue.size() << std::endl;
            //event_queue = std::queue<Event>();
            //while (!event_queue.empty()) {
            //  Event e = event_queue.back();
                event_queue.pop();
            //}
            std::cerr << "Num events (after): " << event_queue.size() << std::endl;
            
        }
        number_of_empty_positions.release();
        
        //time_point += frame{10};
        //std::this_thread::sleep_until(time_point);
        
        // display frame
        std::cerr << "consumer thread not blocked" << std::endl;
    }
}

int main()
{
    std::thread t(constantly_running_proc);
    uint32_t counter = 0;
    // producer
    while (1) {
        number_of_empty_positions.acquire(); // decrement counter or blocks until it can
        {
            std::lock_guard lock(buffer_manip);
            event_queue.push({0});
            std::cerr << "new event" << std::endl;
        }
        number_of_queued_positions.release(); // increment internal counter and unblocks acquires
        if (++counter > 2000000) 
            break;
        //std::this_thread::sleep_for(50ms * rand() / (float)RAND_MAX);
    }
    
    keep_running = false;
    
    return 0;
}

考虑到需求,这并不理想(假设事件是“调整窗口大小”。实现简单、优雅并且似乎有效)。尽管如此,它仍需要在每个消费者周期中完全清空事件队列。

我将尝试 wiki 中建议的其他解决方案(“无信号量或监视器”部分),这似乎可以解决此问题,但如果您到目前为止还有任何其他意见/反馈/建议,请...

c++ multithreading event-loop condition-variable
© www.soinside.com 2019 - 2024. All rights reserved.