使用 tokio::time::timeout 不会捕获来自 TCPReadStream 的延迟响应

问题描述 投票:0回答:1

我正在向比特币服务器写入 PoC P2P 节点握手。 我向目标节点发送一条“版本”消息,目标节点会使用相应的版本消息进行响应。到目前为止一切顺利。

但是,来自目标节点的版本消息有时可能需要两分钟以上才能到达。服务器最终确实给出了成功的响应,但由于我已对其他网络活动应用了 5 秒超时,我认为为了保持一致性,我应该在这里执行相同的操作。

这是没有任何超时检查的代码。这很有效,尽管有时非常慢:

use bitcoin::{
    consensus::Decodable,
    p2p::message::{self},
};
use std::{
    io::BufReader,
    net::{IpAddr, SocketAddr, TcpStream},
    time::{Duration, SystemTime},
};

pub async fn handshake(to_address: IpAddr, port: u16, timeout: u64) -> Result<()> {
    let target_node = SocketAddr::new(to_address, port);

    let mut write_stream = TcpStream::connect_timeout(&target_node, Duration::from_secs(timeout))?;

    // Build and send version message
    send_msg_version(&target_node, &mut write_stream)?;

    let read_stream = write_stream.try_clone()?;
    let mut stream_reader = BufReader::new(read_stream);

    let response1 = message::RawNetworkMessage::consensus_decode(&mut stream_reader)?;

    match response1.payload() {
        // Do stuff here
    }

    // More stuff here...
}

consensus_decode()
的调用有时需要很长时间,因此我尝试将其包装在
future::ready()
中,然后将其放入对
tokio::time::timeout()
的调用中,如下所示:

    let response1 = if let Ok(net_msg) = tokio::time::timeout(
        Duration::from_secs(timeout),
        future::ready(message::RawNetworkMessage::consensus_decode(&mut stream_reader)?),
    )
    .await
    {
        net_msg
    } else {
        return Err(CustomError(format!(
            "TIMEOUT: {} failed to respond with VERSION message within {} seconds",
            to_address, timeout
        )));
    };

(&mut stream_reader)?
部分成功捕获任何解码错误,但如果消息成功到达,但速度太慢,
tokio::time::timeout
无法捕获它。

我在这里缺少什么?

谢谢

rust timeout p2p rust-tokio handshake
1个回答
0
投票

您不能使用

future::ready
使同步函数调用异步,请使用
tokio::task::spawn_blocking
代替:

use std::time::Duration;
fn takes_long() {
    std::thread::sleep(Duration::from_millis(100));
}

#[tokio::main]
async fn main() {
    let x = tokio::time::timeout(Duration::from_millis(10), tokio::task::spawn_blocking(|| {
        takes_long();
    })).await;
    eprintln!("{x:?}");
}

产生以下输出:

Err(Elapsed(()))

您的方法的问题在于,在调用

future::ready
之前(更不用说稍后调用
tokio::time::timeout
),函数
consensus_decode
必须已经返回并产生一个值,此时
future::ready
可以立即返回并且您的超时永远不会触发。

© www.soinside.com 2019 - 2024. All rights reserved.