我想跟踪 warp 中的飞行中连接,以便在处理请求之前增加指标计数器,并在处理请求后减少。
我试图通过在链的开头使用“无操作”过滤器和在链的末尾使用自定义日志记录过滤器来解决这个问题;像这样的东西:
/// Increment the request count metric before the requests starts.
fn with_start_call_metrics() -> impl Filter<Extract = (), Error = Infallible> + Clone {
warp::any()
.and(path::full())
.map(|path: FullPath| {
HttpMetrics::inc_in_flight(path.as_str());
})
.untuple_one()
}
/// Decrement the request count metric after the request ended.
fn with_end_call_metrics() -> Log<fn(Info<'_>)> {
warp::log::custom(|info| {
HttpMetrics::dec_in_flight(info.path());
// ... track more metrics, e.g. info.elapsed() ...
})
}
当启动长时间运行的请求(下面代码中的
/slow
)并且在请求可以完全处理之前断开连接(例如CTRL-C
on curl
)时,就会出现问题。
在这种情况下,
slow
路线只是被 warp 中止,并且永远不会到达下面的 with_end_call_metrics
过滤器:
#[tokio::main]
async fn main() {
let hello = warp::path!("hello" / String).and_then(hello);
let slow = warp::path!("slow").and_then(slow);
warp::serve(
with_start_call_metrics()
.and(
hello.or(slow), // ... and more ...
)
// If the call (e.g. of `slow`) is cancelled, this is never reached.
.with(with_end_call_metrics()),
)
.run(([127, 0, 0, 1], 8080))
.await;
}
async fn hello(name: String) -> Result<impl warp::Reply, warp::Rejection> {
Ok(format!("Hello, {}!", name))
}
async fn slow() -> Result<impl warp::Reply, warp::Rejection> {
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(format!("That was slow."))
}
我知道这是正常行为,推荐的方法是依赖请求中类型的
Drop
实现,因为它总是会被调用,所以像:
async fn in_theory<F, T, E>(filter: F) -> Result<T, E>
where
F: Filter<Extract = T, Error = E>
{
let guard = TrackingGuard::new();
filter.await
}
但这不起作用。我试过像这样使用
wrap_fn
:
pub fn in_theory<F>(filter: F) -> Result<F::Extract, F::Error>
where
F: Filter + Clone,
{
warp::any()
.and(filter)
.wrap_fn(|f| async {
// ... magic here ...
f.await
})
}
但无论我尝试什么,它总是以这样的错误结束:
error[E0277]: the trait bound `<F as warp::filter::FilterBase>::Error: reject::sealed::CombineRejection<Infallible>` is not satisfied
--> src/metrics.rs:255:25
|
255 | warp::any().and(filter).wrap_fn(|f| async { f.await })
| --- ^^^^^^ the trait `reject::sealed::CombineRejection<Infallible>` is not implemented for `<F as warp::filter::FilterBase>::Error`
| |
| required by a bound introduced by this call
并且无法指定,因为
reject::sealed
不是公共模块。
任何帮助表示赞赏!