我有一个
Consumer
类,应该每秒从队列中删除 n
个项目,但不再有:
class Consumer {
Consumer(int items_per_sec, Storage& storage) :
items_per_sec{items_per_sec}, storage{storage} {}
void consume() {
while (!storage.empty()) {
for (int i = 0; i < items_per_sec; ++i) {
storage.consume();
}
std::this_thread::sleep_for(1s);
}
}
private:
int items_per_sec;
Storage& storage;
};
这里,
Storage
是一个线程安全的队列抽象。假设还有一个 Producer
类向此存储添加项目,但它在某个时刻停止,并且只有一个生产者线程和一个消费者线程。
这段代码(至少)有两个问题,但我不知道如何解决它们:
Consumer
每秒不会正好消耗 n
个项目,因为它在处理 n
项目后会休眠一整秒(假设 n
不会太高,以至于处理 n
需要超过 1 秒的时间)
物品,但也不一定很小)。n
物品,Consumer
会尝试消耗太多次。实现“每秒做某事 n 次”的这种模式的正确方法是什么?
一般来说,这应该可行 - 您创建一个包含已处理项目的时间戳的容器。可能
std::deque
是一个很好的候选者(由于 pop_front()
和 push_back()
的存在)现在的算法是:
1 If processing queue is empty - sleep indefinitely on `std::condition_var`
2 Remove older than a second timestamps from the beginning of the `std::deque`
3 Process items in the queue and add timestamps to `std::deque`
4 If `std::deque` size reached `items_per_sec` sleep until the first timestamp + 1 second in `std::deque` and go to p 1
5 If queue is empty go to p 1
当您将元素添加到队列中时,如有必要,请通知另一个线程条件变量。因此,基本上将
std::deque
大小限制为 items_per_sec
并且仅保持时间戳更新于 1 秒即可完成这项工作。