tokio::select!() 的非异步等价物是什么?

问题描述 投票:0回答:1

我有一些由同事编写的异步代码,我希望使用常规线程以非异步方式重写。这段代码在几个地方使用了 tokio::select!() ,我不确定如何将其转换为非异步代码。

这是一个例子:

async fn run<S>(
    &self,
    mut peer: S,
    mut from_main_rx: tokio::sync::broadcast::Receiver<MainToPeerThread>,
) -> Result<()>
where
    S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
    <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
    <S as TryStream>::Error: std::error::Error,
{
    loop {
        tokio::select! {
            // Handle peer messages
            peer_message = peer.try_next() => {
                // do stuff
            }

            // Handle messages from main thread
            main_msg_res = from_main_rx.recv() => {
                // do stuff
            }
        }
    }
    Ok(())
}

所以首要问题是:

一般来说,如何使用非异步代码重写 tokio select!() ?

还有额外的功劳:上面的函数会是什么样子?

multithreading asynchronous rust rust-tokio
1个回答
0
投票

在非异步上下文中,并没有真正与

select!
直接等效的东西。

Rust 的异步等待是一种协作式多任务处理形式,允许操作在无法取得进展时“屈服”。它还使用唤醒机制,提供一种方式来表明其中一个操作可以取得进展。 使用

select!

只是提供了一种在同一执行序列中生成多个操作的方法。如果一个操作发出要唤醒它的信号,

select!
将传递该信号并让它继续执行。如果两者都在等待某件事发生,那么
select!
同样会屈服,直到某事发生。
有两种主要方法可以在非异步代码中实现类似的结果:

  1. 尝试在同一个线程上做两件事:

    最直接的等效方法是尝试运行

    try_next()

    recv()
    ,但如果它们无法取得进展,则不会有太大作用。这将需要您所使用的类型提供的这些调用的特定非异步非阻塞版本。这样做可以在某种程度上获得与这样相同的效果(伪代码):
     loop {
         if let Some(peer_message) = peer.non_blocking_next() {
             // do stuff
         }
    
         if let Some(main_msg_res) = from_main_rx.non_blocking_recv() {
             // do stuff
         }
     }
    

    但请注意,这个循环现在根本不会产生任何结果,因此它会将 CPU 固定在 100%,同时只是等待。您可以通过引入一些 
    sleep

    来限制其影响,但这会增加流处理的延迟。虽然使用非阻塞方法可能会使您的代码具有协作性(对于某些定义),但它仍然缺乏保持性能的唤醒机制。

    使用一些 

    recv_with_timeout()

    方法可以工作,但仍然会遇到同样的问题,除非您同意一项操作获得优先级。无论哪种方式,它都不能很好地扩展或组合。

    使用其他一些信令方法

    也可以

    ,但这需要这些流的发送端付出更多的努力和协调来提醒数据存在(从而复制 async-await 已经做的事情)。总的来说,这可能是不可行的。

  2. 同时做两件事

    如果真正的目标只是同时处理两个流(更符合逻辑),那么您的非异步替代方案将是使用成熟的线程。在这种情况下,您将启动两个具有自己循环的线程,每个线程都使用阻塞方法来获取数据。像这样的东西(伪代码):

    std::thread::scope(|s| { s.spawn(|| { loop { let peer_message = peer.blocking_next(); // do stuff } }); s.spawn(|| { loop { let main_msg_res = from_main_rx.blocking_recv(); // do stuff } }); });

    这与异步或非阻塞版本的做法并不完全相同,因为它使用更多的系统资源(通过使用多个执行线程),但它将确保尽快执行每个“do stuff” .

  3. 总而言之,非异步函数不像异步函数那样合作,因此您可能必须做出一些让步。

在您的特定代码中,我建议仅生成线程的后一种选择。更容易做到正确并保持性能,特别是如果这些是长期运行的作业。当然,这假设您已经拥有非异步流和已经弄清楚的接收器类型。

© www.soinside.com 2019 - 2024. All rights reserved.