我正在尝试使用grpc实现pub子模式,但我对如何正确地执行它有点困惑。
我的原型:rpc call (google.protobuf.Empty) returns (stream Data);
客户:
asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
@Override
public void onNext(Data value) {
// process a data
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
} catch (StatusRuntimeException e) {
LOG.warn("RPC failed: {}", e.getStatus());
}
Thread.currentThread().join();
服务器服务:
public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
private final BlockingQueue<Data> queue;
private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();
public Sender(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// waiting for first element
Data data = queue.take();
// send head element
observers.forEach(o -> o.onNext(data));
} catch (InterruptedException e) {
LOG.error("error: ", e);
Thread.currentThread().interrupt();
}
}
}
}
如何正确地从全球观察者中删除客户?连接断开时如何接收某种信号? 如何管理客户端 - 服务器重新连接?如何在连接断开时强制客户端重新连接?
提前致谢!
在您的服务实施中:
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
您需要获取当前请求的Context和listen for cancellation。对于单请求,多响应调用(a.k.a.服务器流),gRPC生成的代码被简化为直接传递请求。这意味着您无法直接访问底层ServerCall.Listener
,这是您通常会侦听客户端断开连接和取消的方式。
相反,每个gRPC调用都有一个与之关联的Context
,它携带取消和其他请求范围的信号。对于您的情况,您只需要通过添加自己的侦听器来侦听取消,然后从链接的哈希集中安全地删除响应观察者。
至于重新连接:如果连接断开,gRPC客户端将自动重新连接,但通常不会重试RPC,除非这样做是安全的。对于服务器流式RPC,通常不安全,因此您需要直接在客户端上重试RPC。