使用 grpc.aio python 记录拦截器

问题描述 投票:0回答:1

我找不到 python 日志拦截器的示例实现来测量处理请求所需的时间。

我想测量服务器端处理请求需要多长时间。我尝试编写自己的日志拦截器但没有成功。

在我看来,

continuation
不会等待请求完成。

这是我的 .proto 文件

syntax = "proto3";

package auth;

service SampleService {
  rpc Hello(HelloRequest) returns (HelloResponse);
}

message HelloRequest{
  string Name = 1;
}

message HelloResponse{
  string Greeting = 1;
}

grpc.py

import asyncio
from typing import Callable, Awaitable

import grpc

from pkg.py.gen import service_pb2
from pkg.py.gen import service_pb2_grpc
from ..logger import get_logger
from ..services import Service


class SampleService(service_pb2_grpc.SampleServiceServicer):
    def __init__(self) -> None:
        super().__init__()

    async def Hello(self, request: service_pb2.HelloRequest, _) -> service_pb2.HelloResponse:
        await asyncio.sleep(1)
        return service_pb2.HelloResponse(Greeting=Service.hello(request.Name))


class LoggingServerInterceptor(grpc.aio.ServerInterceptor):
    async def intercept_service(
            self,
            continuation: Callable[
                [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]
            ],
            handler_call_details: grpc.HandlerCallDetails,
    ) -> grpc.RpcMethodHandler:
        ...

应用程序.py

from abc import ABC, abstractmethod
from concurrent import futures

import grpc.aio

from pkg.py.gen import service_pb2_grpc
from ..config import Config
from ..logger import get_logger
from ..transport import SampleService, LoggingServerInterceptor


class AbstractApp(ABC):
    @abstractmethod
    async def run(self) -> None:
        ...

    @abstractmethod
    async def stop(self) -> None:
        ...


class App(AbstractApp):
    def __init__(self):
        self.server = grpc.aio.server(
            futures.ThreadPoolExecutor(max_workers=10),
            interceptors=[LoggingServerInterceptor()]
        )
        service_pb2_grpc.add_SampleServiceServicer_to_server(
            SampleService(), self.server
        )
        address = f"{Config().HOST}:{Config().PORT}"
        logger = get_logger()
        logger.info(f"Listening on {address}")
        self.server.add_insecure_port(address)

    async def run(self) -> None:
        await self.server.start()
        await self.server.wait_for_termination()

    async def stop(self) -> None:
        logger = get_logger()
        logger.info("Shutting down in progress")

主.py

import asyncio
import sys
from pathlib import Path

root = Path(__file__).resolve().parents[2].__str__()  # noqa: E402
sys.path.append(root)  # noqa: E402

from internal.config import Config
from internal.logger import get_logger, setup_logger
from internal.app.app import App


def main() -> None:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    app = App()
    logger = get_logger()

    loop.create_task(app.run())

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        logger.info("Gracefully shutting down")
        loop.run_until_complete(app.stop())
        loop.close()
        logger.info("Shut down completed")


if __name__ == '__main__':
    main()
python python-asyncio grpc interceptor grpc-python
1个回答
0
投票

找到那个库

class RequestLoggingInterceptor(AsyncServerInterceptor):
    async def intercept(
            self,
            method: Callable,
            request: Any,
            context: grpc.ServicerContext,
            method_name: str,
    ) -> Any:
        start_time = time.monotonic()

        try:
            return await method(request, context)
        finally:
            request_time = time.monotonic() - start_time
            get_logger().info(
                f"Request {method_name} - duration {request_time}s"
            )
© www.soinside.com 2019 - 2024. All rights reserved.