在另一个之前自动耗尽一个 mpsc

问题描述 投票:0回答:1

我在一个线程中有两个

std::sync::mpsc
接收器,一个优先接收器和一个常规接收器。有什么方法可以原子地确保在使用常规队列之前排空优先级队列?我确保在
Sender
上存在先行关系,因此优先级操作总是先发送。

这是我目前的解决方案,我几乎可以肯定这是不正确的:

loop {
    for priority_op in priority_rx.try_iter() {
        match priority_op {
            ...
        }
    }
    
    // A priority op, and then a regular op could arrive here

    let op = rx.recv().unwrap();
    match op {
        ...
    }
}

如果这是不可能的,我可以使用什么替代架构(除了只有一个队列)来实现这种行为?

multithreading rust atomic
1个回答
1
投票

在普通频道收到东西后可以查看优先频道

loop {
    for priority_op in priority_rx.try_iter() {
        use_priority(priority_op);
    }

    // A priority op, and then a regular op could arrive here

    let op = rx.recv().unwrap();
    let raced_priority = priority_rx.try_recv();

    if let Ok(raced) = raced_priority {
        // return op to queue
        tx.send(op);
        use_priority(raced);
    } else {
        // No race occurred
        use_op(op);
    }
}

这有一个很大的缺点,就是你失去了常规频道的排序。因此,您可以将常规操作放入局部变量中。

let mut regular = None;
loop {
    for priority_op in priority_rx.try_iter() {
        use_priority(priority_op);
    }

    // This op was received in the last loop
    if let Some(event) = regular.take() {
        use_op(event);
    }

    // A priority op, and then a regular op could arrive here

    let op = rx.recv().unwrap();
    let raced_priority = priority_rx.try_recv();
    if let Ok(raced) = raced_priority {
        // return op to queue
        regular = Some(op);
        use_priority(raced);
    } else {
        // No race occurred
        use_op(op);
    }
}

如果你改变循环,这可以写得更好。

loop {
    let op = rx.recv().unwrap();
    let raced_priority = priority_rx.try_recv();

    let regular = if let Ok(raced) = raced_priority {
        use_priority(raced);
        // return op to queue
        Some(op)
    } else {
        // No race occurred
        use_op(op);
        None
    };

    for priority_op in priority_rx.try_iter() {
        use_priority(priority_op);
    }

    // This op was received before draining priority_rx
    if let Some(event) = regular {
        use_op(event);
    }
}

为了保持对优先级队列的响应,您可能想要删除

recv
并使用
recv_timeout
try_recv
,并且您可能希望在优先级队列上这样做。

loop {
    let regular = match rx.try_recv() {
        Ok(op) => {
            let raced_priority = priority_rx.try_recv();

            if let Ok(raced) = raced_priority {
                // return op to queue
                use_priority(raced);
                Some(op)
            } else {
                // No race occurred
                use_op(op);
                None
            }
        }
        Err(TryRecvError::Empty) => None,
        Err(_) => panic!("Disconnected"),
    };

    if let Ok(priority_op) = priority_rx.recv_timeout(some_duration) {
        use_priority(priority_op);
        for priority_op in priority_rx.try_iter() {
            use_priority(priority_op);
        }
    }

    // This op was received before draining priority_rx
    if let Some(event) = regular {
        use_op(event);
    }
}

这意味着如果常规通道为空,优先通道将被尽快使用,如果优先通道为空,常规通道可能延迟最多

some_duration
(加上两者的循环开销)。如果你想经常检查队列,你可以使用
try_recv
而不是
recv_timeout
,并包含一个
spin_loop
提示。

写完才知道可以压缩

loop {
    let regular = match rx.try_recv();

    if let Ok(priority_op) = priority_rx.recv_timeout(some_duration) {
        use_priority(priority_op);
        for priority_op in priority_rx.try_iter() {
            use_priority(priority_op);
        }
    }

    // This op was received before draining priority_rx
    if let Ok(event) = regular {
        use_op(event);
    }
}

还有旋转的版本。

loop {
    let regular = match rx.try_recv();

    for priority_op in priority_rx.try_iter() {
        use_priority(priority_op);
    }

    // This op was received before draining priority_rx
    if let Ok(event) = regular {
        use_op(event);
    }

    spin_loop();
}

在这一点上,它与您的原始版本相比发生了很大变化,但我认为它应该可以满足您的需求。它还会忽略挂断的频道,因此您可能应该将其添加进去。

可能有更适合这个特定任务的数据结构,但我不知道,它不在标准库中,你可能必须从头开始构建它。如果确实存在,希望有人可以发布答案。

© www.soinside.com 2019 - 2024. All rights reserved.