我有一些由同事编写的异步代码,我希望使用常规线程以非异步方式重写。这段代码在几个地方使用了 tokio::select!() ,我不确定如何将其转换为非异步代码。
这是一个例子:
async fn run<S>(
&self,
mut peer: S,
mut from_main_rx: tokio::sync::broadcast::Receiver<MainToPeerThread>,
) -> Result<()>
where
S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
<S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
<S as TryStream>::Error: std::error::Error,
{
loop {
tokio::select! {
// Handle peer messages
peer_message = peer.try_next() => {
// do stuff
}
// Handle messages from main thread
main_msg_res = from_main_rx.recv() => {
// do stuff
}
}
}
Ok(())
}
所以首要问题是:
一般来说,如何使用非异步代码重写 tokio select!() ?
还有额外的功劳:上面的函数会是什么样子?
在非异步上下文中,并没有真正与
select!
直接等效的东西。
Rust 的异步等待是一种协作式多任务处理形式,允许操作在无法取得进展时“屈服”。它还使用唤醒机制,提供一种方式来表明其中一个操作可以取得进展。 使用
select!
只是提供了一种在同一执行序列中生成多个操作的方法。如果一个操作发出要唤醒它的信号,
select!
将传递该信号并让它继续执行。如果两者都在等待某件事发生,那么 select!
同样会屈服,直到某事发生。有两种主要方法可以在非异步代码中实现类似的结果:
最直接的等效方法是尝试运行
try_next()
和
recv()
,但如果它们无法取得进展,则不会有太大作用。这将需要您所使用的类型提供的这些调用的特定非异步非阻塞版本。这样做可以在某种程度上获得与这样相同的效果(伪代码): loop {
if let Some(peer_message) = peer.non_blocking_next() {
// do stuff
}
if let Some(main_msg_res) = from_main_rx.non_blocking_recv() {
// do stuff
}
}
但请注意,这个循环现在根本不会产生任何结果,因此它会将 CPU 固定在 100%,同时只是等待。您可以通过引入一些
sleep
来限制其影响,但这会增加流处理的延迟。虽然使用非阻塞方法可能会使您的代码具有协作性(对于某些定义),但它仍然缺乏保持性能的唤醒机制。
使用一些recv_with_timeout()
方法可以工作,但仍然会遇到同样的问题,除非您同意一项操作获得优先级。无论哪种方式,它都不能很好地扩展或组合。
使用其他一些信令方法也可以,但这需要这些流的发送端付出更多的努力和协调来提醒数据存在(从而复制 async-await 已经做的事情)。总的来说,这可能是不可行的。
如果真正的目标只是同时处理两个流(更符合逻辑),那么您的非异步替代方案将是使用成熟的线程。在这种情况下,您将启动两个具有自己循环的线程,每个线程都使用阻塞方法来获取数据。像这样的东西(伪代码):
std::thread::scope(|s| {
s.spawn(|| {
loop {
let peer_message = peer.blocking_next();
// do stuff
}
});
s.spawn(|| {
loop {
let main_msg_res = from_main_rx.blocking_recv();
// do stuff
}
});
});
这与异步或非阻塞版本的做法并不完全相同,因为它使用更多的系统资源(通过使用多个执行线程),但它将确保尽快执行每个“do stuff” .
在您的特定代码中,我建议仅生成线程的后一种选择。更容易做到正确并保持性能,特别是如果这些是长期运行的作业。当然,这假设您已经拥有非异步流和已经弄清楚的接收器类型。