grpc 异步双向服务器(Java/Python)

问题描述 投票:0回答:1

这是我的场景。 grpc 服务器是一个异步服务器,它从其他源订阅数据。 它还为其客户端提供了订阅功能,因此一旦从其他来源接收到数据,它就可以将数据推送到 grpc 客户端。服务器是用Java实现的。

@Override
public StreamObserver<Message> subscribe (
        StreamObserver<Message> responseObserver) {
    return new StreamObserver<Message>() {

        @Override
        public void onNext(Message message) {
            api.subscribe(message.value, Message -> {
                synchronized (responseObserver) {
                    ...
                    ...
                    // get data from other source

                    responseObserver.onNext(Converter.create().toProtobuf(Message.class, data));
                }
            });
        }

        @Override
        public void onError(Throwable t) {
            log.warn("Encountered error in sub", t);
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }
    };
}

我想使用python实现一个grpc客户端来订阅该服务器。然而,奇怪的是,Python一旦订阅数据,就立即关闭,而不等待Java服务器的异步返回。然而,Java 客户端可以永远运行并等待来自服务器的异步数据。

原型

message Message{
    string value = 1;
}
service test {
    rpc subscribe(stream Message) returns (stream Message) {}
}

Python客户端代码(不工作)

def gen_message():
    yield test.Message(value="2")

def run():
    channel = grpc.insecure_channel('localhost:50051')
    stub = test_grpc.MessengerStub(channel)

    stream = stub.subscribe(gen_message())
    try:
        for e in stream:
            print(e)
    except grpc._channel._Rendezvous as err:
        print(err)

run()

Java 代码(工作)

StreamObserver<Message> requestObserver = stub.subscribe(new StreamObserver<Message>() {
    @Override
    public void onNext(Message message) {
        System.out.println(message)
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onCompleted() {

    }
});
Message message = Message.newBuilder().build();
requestObserver.onNext(message);

我已经很困惑了。如何在Python客户端中实现相同的功能? 谢谢~

附注如果服务器是 while True 服务器而不是异步服务器,则 python 客户端可以工作。我怀疑 python 客户端不知道“异步”服务器,一旦它的流没有新数据,它就会关闭连接。

java python grpc grpc-java grpc-python
1个回答
0
投票

这是Python API的一个“bug”功能,一旦request_iterator耗尽,它会在向服务器发送请求时自动添加onComplete消息。 在您的情况下,要修复它,您可以将 request_iterator 更改为队列,并基于队列创建一个无限生成器。 如果您也拥有 Java 服务器端,则可以忽略 onComplete 消息,该消息也有效。

© www.soinside.com 2019 - 2024. All rights reserved.