如何从Python gRPC拦截器返回错误?

问题描述 投票:0回答:1

如何在此拦截器中检查身份验证并因身份验证错误而拒绝请求?

class Authentication(grpc.ServerInterceptor):
    def intercept_service(self, continuation, details):
        # How to raise an error?

我已经看过这个例子,但这似乎是一种较旧的拦截器编写风格。我正在扩展

grpc.ServerInterceptor
类并将拦截器传递到服务器像这样。像这样编写的拦截器是否可以返回身份验证错误?

python grpc interceptor
1个回答
0
投票

您所指的示例是针对客户端拦截器的。

这个服务器拦截器的示例应该适合您:

"""Interceptor that ensures a specific header is present."""

import grpc


def _unary_unary_rpc_terminator(code, details):
    def terminate(ignored_request, context):
        context.abort(code, details)

    return grpc.unary_unary_rpc_method_handler(terminate)


class RequestHeaderValidatorInterceptor(grpc.ServerInterceptor):
    def __init__(self, header, value, code, details):
        self._header = header
        self._value = value
        self._terminator = _unary_unary_rpc_terminator(code, details)

    def intercept_service(self, continuation, handler_call_details):
        if (
            self._header,
            self._value,
        ) in handler_call_details.invocation_metadata:
            return continuation(handler_call_details)
        else:
            return self._terminator

对于身份验证错误使用 UNAUTHENTICATED 状态代码,对于授权错误使用 PERMISSION_DENIED。

© www.soinside.com 2019 - 2024. All rights reserved.