缺少实现(但帮助消息说我已经实现了)

问题描述 投票:0回答:1

我正在编写一个 tonic 中间件来将应用程序状态加载到 gRPC 处理程序中,但我无法让我的应用程序与中间件层一起服务

我已经仔细检查了类型并查看了许多示例,但我无法说出我的实现中出了什么问题。不仅如此,编译器错误中的

help
消息似乎表明我已经实现了?

任何帮助将不胜感激!

编译时出现完整错误:

error[E0277]: the trait bound `RegistryApplicationStateMiddleware<Routes>: Service<http::request::Request<tonic::transport::Body>>` is not satisfied
  --> registry/src/main.rs:39:6
   |
39 |     .serve(addr)
   |      ^^^^^ the trait `Service<http::request::Request<tonic::transport::Body>>` is not implemented for `RegistryApplicationStateMiddleware<Routes>`
   |
   = help: the trait `Service<http::Request<tonic::transport::Body>>` is implemented for `RegistryApplicationStateMiddleware<S>`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `registry` (bin "registry") due to 1 previous error

注意帮助消息表明该特征是为中间件的通用版本实现的?但不知何故与路线不匹配...

中间件.rs

use std::{convert::Infallible, task::{Context, Poll}};

use data_access::db::ConnectionPool;
use http::Response;
use tonic::{body::BoxBody, server::NamedService};
use tower_service::Service;

#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationState {
    pub pool: ConnectionPool,
}


#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationStateLayer {
    pub pool: ConnectionPool,
}

impl<S> tower_layer::Layer<S> for RegistryApplicationStateLayer {
    type Service = RegistryApplicationStateMiddleware<S>;

    fn layer(&self, service: S) -> Self::Service {
        RegistryApplicationStateMiddleware {
            inner: service,
            pool: self.pool.clone(),
        }
    }
}

#[derive(Debug, Clone)]
pub struct RegistryApplicationStateMiddleware<S> {
    inner: S,
    pub pool: ConnectionPool,
}


impl<S> Service<http::request::Request<tonic::transport::Body>> for RegistryApplicationStateMiddleware<S>
where
S: Service<http::request::Request<tonic::transport::Body>, Response = Response<BoxBody>, Error = Infallible>
    + NamedService 
    + Clone
    + Send + 'static,
    S::Future: Send + 'static,
    S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, mut req: http::request::Request<tonic::transport::Body>) -> Self::Future {
        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
        // for details on why this is necessary
        let clone = self.inner.clone();
        let mut inner = std::mem::replace(&mut self.inner, clone);

        req.extensions_mut().insert(RegistryApplicationState {
            pool: self.pool.clone(),
        });

        Box::pin(async move {
            // Do extra async work here...
            let response = inner.call(req).await?;

            Ok(response)
        })
    }
}

main.rs

use middleware::RegistryApplicationStateLayer;
use tonic::transport::Server;

use client::registry_service_server::RegistryServiceServer;
use server::DemoClient;


use data_access::db::ConnectionPool;

pub mod client;
mod middleware;
pub mod server;

mod store_proto {
  include!("client.rs");

  pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
    tonic::include_file_descriptor_set!("store_descriptor");
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  dotenv::dotenv().ok();
  let addr = "127.0.0.1:9001".parse()?;
  let demo_registry_service = DemoClient::default();

  let reflection_service = tonic_reflection::server::Builder::configure()
    .register_encoded_file_descriptor_set(store_proto::FILE_DESCRIPTOR_SET)
    .build()
    .unwrap();

    let pool = ConnectionPool::default();


  Server::builder()
    .layer(RegistryApplicationStateLayer { pool })
    .add_service(RegistryServiceServer::new(demo_registry_service))
    .add_service(reflection_service)
    .serve(addr)
    .await?;
  Ok(())
}

另一个注意事项,这就是 Router 服务实现在 tonic 库中的样子,我试图在我的实现中模拟它:

/// A [`Service`] router.
#[derive(Debug, Default, Clone)]
pub struct Routes {
    router: axum::Router,
}

impl Routes {
    pub(crate) fn new<S>(svc: S) -> Self
    where
        S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
            + NamedService
            + Clone
            + Send
            + 'static,
        S::Future: Send + 'static,
        S::Error: Into<crate::Error> + Send,
    {
        let router = axum::Router::new().fallback(unimplemented);
        Self { router }.add_service(svc)
    }

    pub(crate) fn add_service<S>(mut self, svc: S) -> Self
    where
        S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
            + NamedService
            + Clone
            + Send
            + 'static,
        S::Future: Send + 'static,
        S::Error: Into<crate::Error> + Send,
    {
        let svc = svc.map_response(|res| res.map(axum::body::boxed));
        self.router = self
            .router
            .route_service(&format!("/{}/*rest", S::NAME), svc);
        self
    }

    pub(crate) fn prepare(self) -> Self {
        Self {
            // this makes axum perform update some internals of the router that improves perf
            // see https://docs.rs/axum/latest/axum/routing/struct.Router.html#a-note-about-performance
            router: self.router.with_state(()),
        }
    }
}

最后,这是我的 Cargo.toml 文件

[package]
name = "registry"
version = "0.1.0"
build = "build.rs"
edition = "2021"
publish = false

[dependencies]
axum = { version = "0.7.5", features = [ "json" ] }
clap = { version = "4.1.4", features = ["derive"] }
dotenv = "0.15.0"
futures = "0.3"
http = "1.1.0"
hyper = "1.2.0"
prost = "0.11"
tokio = { version = "1.37", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tonic-reflection = "0.6.0"
tower-http = { version = "0.5.0", features = ["trace"] }
tower-layer = "0.3.2"
tower-request-id = "0.3.0"
tower-service = "0.3.2"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
bytes = "1.6.0"

[build-dependencies]
tonic-build = "0.8"

[dev-dependencies]
uuid = { version = "1.2.2", features = ["v4", "fast-rng"] }
futures-util = "0.3.25"
anyhow = "1"


generics rust traits implementation
1个回答
0
投票

孩子们,一定要仔细检查你的依赖关系。问题是我安装了

hyper 1.0
并且 Tonic 还不支持 hyper 1.0。这是工作(编译)中间件
hyper = "0.14.28"


use std::task::{Context, Poll};

use axum::extract::FromRef;
use data_access::db::Pool;
use hyper::body::Body;
use tonic::body::BoxBody;
use tower_service::Service;

#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationState {
  pub pool: Pool,
}

impl FromRef<RegistryApplicationState> for Pool {
  fn from_ref(app_state: &RegistryApplicationState) -> Pool {
    app_state.pool.clone()
  }
}

#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationStateLayer {
  pub pool: Pool,
}

impl<S> tower_layer::Layer<S> for RegistryApplicationStateLayer {
  type Service = RegistryApplicationStateMiddleware<S>;

  fn layer(&self, service: S) -> Self::Service {
    RegistryApplicationStateMiddleware {
      inner: service,
      pool: self.pool.clone(),
    }
  }
}

#[derive(Debug, Clone)]
pub struct RegistryApplicationStateMiddleware<S> {
  inner: S,
  pub pool: Pool,
}

impl<S> Service<hyper::Request<Body>>
  for RegistryApplicationStateMiddleware<S>
  where
      S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
      S::Future: Send + 'static,
  {
  type Response = S::Response;
  type Error = S::Error;
  type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

  fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    self.inner.poll_ready(cx)
  }

  fn call(&mut self, mut req: hyper::Request<Body>) -> Self::Future {
    // This is necessary because tonic internally uses `tower::buffer::Buffer`.
    // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
    // for details on why this is necessary
    let clone = self.inner.clone();
    let mut inner = std::mem::replace(&mut self.inner, clone);

    req.extensions_mut().insert(RegistryApplicationState {
      pool: self.pool.clone(),
    });

    Box::pin(async move {
      // Do extra async work here...
      let response = inner.call(req).await?;

      Ok(response)
    })
  }
}


© www.soinside.com 2019 - 2024. All rights reserved.