当长轮询请求在轮询中阻塞时如何让另一个未来的轮询 fn

问题描述 投票:0回答:1

出于某种原因,我必须在我的项目中使用 tokio/future 0.1,我想看很多 consul 服务。当一个未来被长轮询请求阻塞时,我希望其他相同的未来可以轮询。我该怎么办?

Cargo.toml

[dependencies]
tokio = "0.1"
futures= "0.1"

我的测试用例将每 5 秒输出一次 this is a test xxx

this is a test name1
this is a test name2
this is a test name1
this is a test name2
use std::thread;
use std::thread::sleep;

use futures::{Async, Future, Stream, task};
use futures::future::join_all;
use tokio::runtime::current_thread;



pub struct JnsWatcher {
    name: String,
}

impl JnsWatcher {
    pub fn new(name: String) -> Self {
        JnsWatcher {
            name
        }
    }
}


impl Future for JnsWatcher {
    type Item = ();
    type Error = ();

    fn poll(&mut self, ) -> Result<Async<Self::Item>, Self::Error> {
        loop {
            sleep(std::time::Duration::from_secs(5)); // real code will be a long poll request, just like : self.client.service(self.ws.service_name.as_str(), self.ws.tag.as_deref(), self.ws.passing_only.unwrap_or(true), Some(opt))
            println!("this is a test : {}", self.name);
            task::current().notify();
            return Ok(Async::NotReady);
        }
    }
}


#[test]
fn test_pool() {
    let w = JnsWatcher::new("name1".to_string());
    let w2 = JnsWatcher::new("name2".to_string());

    let t = thread::Builder::new()
        .name("watchtest".to_string())
        .spawn(move || {
            current_thread::block_on_all(join_all(vec![w, w2])).unwrap();
        })
        .expect("fail to spawn worker thread");


    t.join().unwrap();
    println!("test end");
}
asynchronous rust future
1个回答
0
投票

我建议使用

Stream
创建
futures::stream::iter_ok
实例并使用
futures::stream::select_all
函数同时轮询流中的所有期货。

所以代码可能看起来像这样。

    use std::thread;
    use std::thread::sleep;
    
    use futures::{Async, Future, Stream, task};
    use futures::future::join_all;
    use futures::stream::{iter_ok, select_all};
    use tokio::runtime::current_thread;
    
    pub struct JnsWatcher {
        name: String,
    }
    
    impl JnsWatcher {
        pub fn new(name: String) -> Self {
            JnsWatcher {
                name
            }
        }
    }
    
    impl Future for JnsWatcher {
        type Item = ();
        type Error = ();
    
        fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
            sleep(std::time::Duration::from_secs(5)); // simulate a long poll request
            println!("this is a test : {}", self.name);
            task::current().notify();
            Ok(Async::NotReady)
        }
    }
#[test]
fn test_pool() {
    let watchers = vec![
        JnsWatcher::new("name1".to_string()),
        JnsWatcher::new("name2".to_string()),
        JnsWatcher::new("name3".to_string()),
    ];

    let stream = select_all(watchers.into_iter().map(|w| w.into_future()));

    let t = thread::Builder::new()
        .name("watchtest".to_string())
        .spawn(move || {
            current_thread::block_on_all(stream).unwrap();
        })
        .expect("fail to spawn worker thread");

    t.join().unwrap();
    println!("test end");
}
© www.soinside.com 2019 - 2024. All rights reserved.