在元类中控制上下文管理器

问题描述 投票:0回答:1

我想知道是否可以在元类和装饰器中自动控制上下文。我编写了一个装饰器函数,用于从 grpc 不安全通道创建存根:

def grpc_factory(grpc_server_address: str):
    print("grpc_factory")
    def grpc_connect(func):
        print("grpc_connect")
        def grpc_connect_wrapper(*args, **kwargs):
            with grpc.insecure_channel(grpc_server_address) as channel:
                stub = AnalyserStub(channel)
                return func(*args, stub=stub, **kwargs)
        return grpc_connect_wrapper
    return grpc_connect

然后我创建了一个元类,它使用上下文管理器以及以

grpc_
开头的每个方法,然后将存根注入到方法 kwargs 中:

class Client(type):
    @classmethod
    def __prepare__(metacls, name, bases, **kwargs):
        return super().__prepare__(name, bases, **kwargs)

    def __new__(cls, name, bases, attrs, **kwargs):
        if "grpc_server_address" not in kwargs:
            raise ValueError("""grpc_server_address is required on client class, see below example\n
            class MyClient(AnalyserClient, metaclass=Client, grpc_server_address='localhost:50051')""")
        for key, value in attrs.items():
            if callable(value) and key.startswith("grpc_"):
                attrs[key] = grpc_factory(kwargs["grpc_server_address"])(value)
        return super().__new__(cls, name, bases, attrs)

由此,我想从原始文件中创建所有未实现错误的方法:

class AnalyserClient(metaclass=Client, grpc_server_address="localhost:50051"):
    def grpc_analyse(self, *args, **kwargs):
        raise NotImplementedError("grpc_analyse is not implemented")

下面类的最终用例,并将存根放入方法参数中:

class AnalyserClient(AC, metaclass=Client, grpc_server_address="localhost:50051"):
    def grpc_analyse(self, text, stub) -> str:
        print("Analysing text: {}".format(text))
        print("Stub is ", stub)
        stub.AnalyseSentiment(text)
        return "Analysed"

我收到此错误,我认为这意味着通道不再打开,但我不确定如何更好地做到这一点,以确保所有用户都有一个简单的界面,可以安全地使用原型文件中定义的服务。

grpc_factory
grpc_connect
grpc_factory
grpc_connect
Inside grpc_connect_wrapper
Created channel
Analysing text: Hello World
Stub is  <grpc_implementation.protos.analyse_pb2_grpc.AnalyserStub object at 0x7f29d7726670>
ERROR:grpc._common:Exception serializing message!
Traceback (most recent call last):
  File "/python/venv/lib/python3.8/site-packages/grpc/_common.py", line 86, in _transform
    return transformer(message)
TypeError: descriptor 'SerializeToString' for 'google.protobuf.pyext._message.CMessage' objects doesn't apply to a 'str' object
Traceback (most recent call last):
  File "run_client.py", line 27, in <module>
    client.grpc_analyse("Hello World")
  File "/python/grpc_implementation/client/client.py", line 15, in grpc_connect_wrapper
    return func(*args, stub=stub, **kwargs)
  File "run_client.py", line 11, in grpc_analyse
    stub.AnalyseSentiment(text)
  File "/python/venv/lib/python3.8/site-packages/grpc/_channel.py", line 944, in __call__
    state, call, = self._blocking(request, timeout, metadata, credentials,
  File "/python/venv/lib/python3.8/site-packages/grpc/_channel.py", line 924, in _blocking
    raise rendezvous  # pylint: disable-msg=raising-bad-type
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.INTERNAL
        details = "Exception serializing request!"
        debug_error_string = "None"

原型文件是:

syntax = "proto3";

package analyse;
option go_package = "./grpc_implementation";

service Analyser {
  rpc AnalyseSentiment(SentimentRequest) returns (SentimentResponse) {}
}

message SentimentRequest {
  string text = 1;
}

message SentimentResponse {
  string sentiment = 1;
}

下面是我在元类添加装饰器后尝试模拟的类。

class AnalyserClientTrad:
    def __init__(self, host: str = "localhost:50051"):
        self.host = host

    def grpc_analyse(self, text: str):
        with grpc.insecure_channel(self.host) as channel:
            stub = AnalyserStub(channel)
            response = stub.AnalyseSentiment(SentimentRequest(text=text))
            return response.sentiment

client = AnalyserClientTrad()
print(client.grpc_analyse("Hello, world!"))

我通过传统方式添加装饰器进一步测试了这一点,这也有效:

def grpc_factory(grpc_server_address: str):
    def grpc_connect(func):
        def grpc_connect_wrapper(*args, **kwargs):
            with grpc.insecure_channel(grpc_server_address) as channel:
                stub = AnalyserStub(channel)
                return func(*args, stub=stub, **kwargs)
        return grpc_connect_wrapper
    return grpc_connect



class AnalyserClientTradWithDecs:
    @grpc_factory("localhost:50051")
    def grpc_analyse(self, text: str, stub: AnalyserStub):
        response = stub.AnalyseSentiment(SentimentRequest(text=text))
        return response.sentiment

def run_client_with_decorator():
    client = AnalyserClientTradWithDecs()
    print(client.grpc_analyse("Hello, world!"))

如有任何帮助,我们将不胜感激。

python grpc decorator metaclass
1个回答
2
投票

您在这部分代码中遇到问题,您没有设置预期的原型对象,而是设置字符串

class AnalyserClient(AC, metaclass=Client, grpc_server_address="localhost:50051"):
    def grpc_analyse(self, text, stub) -> str:
        print("Analysing text: {}".format(text))
        print("Stub is ", stub)
        stub.AnalyseSentiment(text) #--> Error, use a proto object here.
        return "Analysed"

正确的方法是更改行

stub.AnalyseSentiment(text)
,用

stub.AnalyseSentiment(SentimentRequest(text=text))
© www.soinside.com 2019 - 2024. All rights reserved.