流返回错误时如何将未来添加回流

问题描述 投票:0回答:0

出于某种原因,我必须在我的项目中使用 futures/tokio 0.1。现在我想要一个无限循环来为很多不同的手表做一个长轮询请求。
无论发生什么Err,比如网络问题,超时,http响应错误等等,我都会使用响应数据来发起新的请求。

但是现在当发生错误时,stream 不会进入 for_each 分支,它会进入 map_err,然后退出。当它进入 map_err 和 runtime.block_on 继续轮询时如何添加一个未来?

[dependencies]
tokio = "0.1"
futures= "0.1"
hyper = { version = "0.12.20" }

我的演示:

// #![deny(warnings)]
extern crate hyper;
extern crate pretty_env_logger;

use cipher::crypto_common::Output;
use futures::future::join_all;
use futures::{future, Async};
use std::env;
use std::fmt::Error;
use std::io::{self, Write};
use std::ops::Add;
use std::pin::Pin;
use std::time::{Duration, Instant};

use hyper::client::HttpConnector;
use hyper::rt::{self, Future, Stream};
use hyper::Client;
use tokio::runtime::current_thread;
use tokio::runtime::current_thread::Runtime;

fn main() {
    poll_long_poll_request();
}

#[derive(Debug)]
pub enum SomeError {
    Http(hyper::Error),
    Json(serde_json::Error),
}

impl From<hyper::Error> for SomeError {
    fn from(err: hyper::Error) -> SomeError {
        SomeError::Http(err)
    }
}

impl From<serde_json::Error> for SomeError {
    fn from(err: serde_json::Error) -> SomeError {
        SomeError::Json(err)
    }
}

fn long_poll_request(
    url: hyper::Uri,
    client: Client<HttpConnector>,
) -> impl Future<Item = Result<Vec<String>, SomeError>, Error = SomeError> {
    // let client = Client::new();

    client
        // Fetch the url...
        .get(url)
        // And then, if we get a response back...
        .and_then(|res| {
            println!("Response: {}", res.status());
            println!("Headers: {:#?}", res.headers());

            res.into_body().concat2()
        })
        .from_err::<SomeError>()
        .and_then(move |(body)| {
            println!("body: {:?}", body);
            let kvs: Vec<String> = serde_json::from_slice(&body)?;
            Ok(kvs)
        })
        .map(|res| Ok(res))
        // If there was an error, let the user know...
        .map_err(|err| {
            eprintln!("Error {:?}", err);
            err
        })
}

fn call_long_poll_request(
    url: hyper::Uri,
    client: Client<HttpConnector>,
) -> impl Future<Item = Result<Vec<String>, SomeError>, Error = SomeError> {
    long_poll_request(url, client)
}

fn poll_long_poll_request() {
    let urls = vec!["http://www.baidu.com/", "http://www.google.com/"];
    let mut runtime = Runtime::new().unwrap();
    let client = Client::new();

    let mut futures = Vec::new();
    for url in urls {
        let url = url.parse::<hyper::Uri>().unwrap();
        let future = call_long_poll_request(url.clone(), client.clone());
        futures.push(future);
    }

    let stream = futures::stream::futures_unordered(futures);

    runtime
        .block_on(
            stream
                .for_each(|response_body| {
                    match response_body {
                        Ok(response_body) => {
                            println!("Long poll request response body: {:?}", response_body);
                            let url = "http://www.baidu.com/";
                            let url = url.parse::<hyper::Uri>().unwrap();
                            let future = call_long_poll_request(url, client.clone());
                            futures::future::Either::A(future.map(|_| ()))
                        }
                        Err(e) => {
                            let url = "http://www.baidu.com/";
                            let url = url.parse::<hyper::Uri>().unwrap();
                            println!("Long poll request error: {:?}", e);
                            let future = call_long_poll_request(url, client.clone());
                            futures::future::Either::B(future.map(|_| ()))
                        }
                    }
                    // futures::future::Either::B(futures::future::ok(()))
                })
                .map_err(|e| {
                    println!("Error: {:?}", e);
                    // i want to make a new future : let future = call_long_poll_request(url, client.clone());
                    // and then stream.push(future), so some request can continue to loop
                }),
        )
        .unwrap();
}
rust stream future rust-tokio
© www.soinside.com 2019 - 2024. All rights reserved.