@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
从其他线程与StreamObserver交互是否有什么注意事项? 例如,比如说 checkFeature()
异步点击另一个服务,返回一个CompletableFuture。
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
thenAccept(feature -> responseObserver.onNext(feature));
responseObserver.onCompleted();
}
当然,上面的方法是行不通的,因为第一个线程会执行 onCompleted()
才会返回该功能。所以我们来解决这个问题。
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
thenAccept(feature -> {
responseObserver.onNext(feature);
responseObserver.onCompleted();
});
}
我想这应该可以,但我是Java新手 所以我不知道会有什么后果。比如说
Context.current()
是一致的吗?onNext()
对于单调电话和 onError()
?如果有人也能给我讲讲他们是如何推理的,那就太好了。我试着找了一下实际实现的 StreamObserver
但我不知道该找什么。
使用 thenAccept()
呼叫 onNext()
和 onCompleted()
是可以的,因为观察者不是从多个线程中并发调用的。
这个 "坏了 "的例子在调用 onCompleted()
分别被打破,也是因为它可以从多个线程调用观察者,而不需要任何形式的同步。StreamObservers
不能同时从多个线程调用观察者。
使用 thenAccept()
但并不完全正确,因为它没有处理未来失败的情况。所以你需要接收 Throwable
也可以用 whenComplete()
:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
whenComplete((feature, t) -> {
if (t != null) {
responseObserver.onError(t);
} else {
responseObserver.onNext(feature);
responseObserver.onCompleted();
}
});
}
在处理这个lambda时,Context很容易 "出错"。通常情况下,我们会寻找 "架构 "解决方案来确保上下文被传播,比如将所有应用线程池封装在一起。Context.currentContextExecutor()
创建时,所以各个呼叫站点不需要关注传播问题。我还不够熟悉 CompletableFuture
来为其提供策略。
Context.current()会一致吗?
Context.current()
是使用 ThreadLocal
.如果你在不同的线程上访问它,它就不会是一致的。你可以在不同的线程之间传播上下文。你可能会发现 此职位 有用。
除了一元调用的onNext()和onError()之外,还有什么东西会导致StreamObserver被破坏或提前关闭吗?
是的,StreamObserver的正常流程以onError或onCompleted结束。
如 StreamObserver
javadoc 指出:"由于单个StreamObservers不是线程安全的,所以如果多个线程将并发地写入一个StreamObserver,应用程序必须同步调用"。如果你在调用StreamObserver 同时,你需要同步调用。换句话说,如果你知道即使你使用多个线程,也不会并发调用它,那应该没问题。
如果在多个线程上访问同一个StreamObserver,我会尝试同步它,除非性能很关键,因为它很容易出错。至少,值得好好评论一下。