在 Rust warp 请求前后执行中间件

问题描述 投票:0回答:0

我想跟踪 warp 中的飞行中连接,以便在处理请求之前增加指标计数器,并在处理请求后减少。

我试图通过在链的开头使用“无操作”过滤器和在链的末尾使用自定义日志记录过滤器来解决这个问题;像这样的东西:

/// Increment the request count metric before the requests starts.
fn with_start_call_metrics() -> impl Filter<Extract = (), Error = Infallible> + Clone {
    warp::any()
        .and(path::full())
        .map(|path: FullPath| {
            HttpMetrics::inc_in_flight(path.as_str());
        })
        .untuple_one()
}

/// Decrement the request count metric after the request ended.
fn with_end_call_metrics() -> Log<fn(Info<'_>)> {
    warp::log::custom(|info| {
        HttpMetrics::dec_in_flight(info.path());
        // ... track more metrics, e.g. info.elapsed() ...
    })
}

当启动长时间运行的请求(下面代码中的

/slow
)并且在请求可以完全处理之前断开连接(例如
CTRL-C
on
curl
)时,就会出现问题。

在这种情况下,

slow
路线只是被 warp 中止,并且永远不会到达下面的
with_end_call_metrics
过滤器:

#[tokio::main]
async fn main() {
    let hello = warp::path!("hello" / String).and_then(hello);
    let slow = warp::path!("slow").and_then(slow);

    warp::serve(
        with_start_call_metrics()
            .and(
                hello.or(slow), // ... and more ...
            )
            // If the call (e.g. of `slow`) is cancelled, this is never reached.
            .with(with_end_call_metrics()),
    )
    .run(([127, 0, 0, 1], 8080))
    .await;
}

async fn hello(name: String) -> Result<impl warp::Reply, warp::Rejection> {
    Ok(format!("Hello, {}!", name))
}

async fn slow() -> Result<impl warp::Reply, warp::Rejection> {
    tokio::time::sleep(Duration::from_secs(5)).await;
    Ok(format!("That was slow."))
}

我知道这是正常行为,推荐的方法是依赖请求中类型的

Drop
实现,因为它总是会被调用,所以像:

async fn in_theory<F, T, E>(filter: F) -> Result<T, E>
where
    F: Filter<Extract = T, Error = E>
{
    let guard = TrackingGuard::new();
    filter.await
}

但这不起作用。我试过像这样使用

wrap_fn

pub fn in_theory<F>(filter: F) -> Result<F::Extract, F::Error>
where
    F: Filter + Clone,
{
    warp::any()
        .and(filter)
        .wrap_fn(|f| async { 
             // ... magic here ...
             f.await 
        })
}

但无论我尝试什么,它总是以这样的错误结束:

error[E0277]: the trait bound `<F as warp::filter::FilterBase>::Error: reject::sealed::CombineRejection<Infallible>` is not satisfied
   --> src/metrics.rs:255:25
    |
255 |         warp::any().and(filter).wrap_fn(|f| async { f.await })
    |                     --- ^^^^^^ the trait `reject::sealed::CombineRejection<Infallible>` is not implemented for `<F as warp::filter::FilterBase>::Error`
    |                     |
    |                     required by a bound introduced by this call

并且无法指定,因为

reject::sealed
不是公共模块。 任何帮助表示赞赏!

rust middleware rust-warp
© www.soinside.com 2019 - 2024. All rights reserved.