我在一个线程中有两个
std::sync::mpsc
接收器,一个优先接收器和一个常规接收器。有什么方法可以原子地确保在使用常规队列之前排空优先级队列?我确保在 Sender
上存在先行关系,因此优先级操作总是先发送。
这是我目前的解决方案,我几乎可以肯定这是不正确的:
loop {
for priority_op in priority_rx.try_iter() {
match priority_op {
...
}
}
// A priority op, and then a regular op could arrive here
let op = rx.recv().unwrap();
match op {
...
}
}
如果这是不可能的,我可以使用什么替代架构(除了只有一个队列)来实现这种行为?
在普通频道收到东西后可以查看优先频道
loop {
for priority_op in priority_rx.try_iter() {
use_priority(priority_op);
}
// A priority op, and then a regular op could arrive here
let op = rx.recv().unwrap();
let raced_priority = priority_rx.try_recv();
if let Ok(raced) = raced_priority {
// return op to queue
tx.send(op);
use_priority(raced);
} else {
// No race occurred
use_op(op);
}
}
这有一个很大的缺点,就是你失去了常规频道的排序。因此,您可以将常规操作放入局部变量中。
let mut regular = None;
loop {
for priority_op in priority_rx.try_iter() {
use_priority(priority_op);
}
// This op was received in the last loop
if let Some(event) = regular.take() {
use_op(event);
}
// A priority op, and then a regular op could arrive here
let op = rx.recv().unwrap();
let raced_priority = priority_rx.try_recv();
if let Ok(raced) = raced_priority {
// return op to queue
regular = Some(op);
use_priority(raced);
} else {
// No race occurred
use_op(op);
}
}
如果你改变循环,这可以写得更好。
loop {
let op = rx.recv().unwrap();
let raced_priority = priority_rx.try_recv();
let regular = if let Ok(raced) = raced_priority {
use_priority(raced);
// return op to queue
Some(op)
} else {
// No race occurred
use_op(op);
None
};
for priority_op in priority_rx.try_iter() {
use_priority(priority_op);
}
// This op was received before draining priority_rx
if let Some(event) = regular {
use_op(event);
}
}
为了保持对优先级队列的响应,您可能想要删除
recv
并使用recv_timeout
或try_recv
,并且您可能希望在优先级队列上这样做。
loop {
let regular = match rx.try_recv() {
Ok(op) => {
let raced_priority = priority_rx.try_recv();
if let Ok(raced) = raced_priority {
// return op to queue
use_priority(raced);
Some(op)
} else {
// No race occurred
use_op(op);
None
}
}
Err(TryRecvError::Empty) => None,
Err(_) => panic!("Disconnected"),
};
if let Ok(priority_op) = priority_rx.recv_timeout(some_duration) {
use_priority(priority_op);
for priority_op in priority_rx.try_iter() {
use_priority(priority_op);
}
}
// This op was received before draining priority_rx
if let Some(event) = regular {
use_op(event);
}
}
这意味着如果常规通道为空,优先通道将被尽快使用,如果优先通道为空,常规通道可能延迟最多
some_duration
(加上两者的循环开销)。如果你想经常检查队列,你可以使用 try_recv
而不是 recv_timeout
,并包含一个 spin_loop
提示。
写完才知道可以压缩
loop {
let regular = match rx.try_recv();
if let Ok(priority_op) = priority_rx.recv_timeout(some_duration) {
use_priority(priority_op);
for priority_op in priority_rx.try_iter() {
use_priority(priority_op);
}
}
// This op was received before draining priority_rx
if let Ok(event) = regular {
use_op(event);
}
}
还有旋转的版本。
loop {
let regular = match rx.try_recv();
for priority_op in priority_rx.try_iter() {
use_priority(priority_op);
}
// This op was received before draining priority_rx
if let Ok(event) = regular {
use_op(event);
}
spin_loop();
}
在这一点上,它与您的原始版本相比发生了很大变化,但我认为它应该可以满足您的需求。它还会忽略挂断的频道,因此您可能应该将其添加进去。
可能有更适合这个特定任务的数据结构,但我不知道,它不在标准库中,你可能必须从头开始构建它。如果确实存在,希望有人可以发布答案。