与StreamObserver异步交互,即与Java 8以上的CompletableFutures交互是否安全?

问题描述 投票:0回答:1

我在看 来自grpc.io基础教程的简单RPC示例。:

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  responseObserver.onNext(checkFeature(request));
  responseObserver.onCompleted();
}

...

private Feature checkFeature(Point location) {
  for (Feature feature : features) {
    if (feature.getLocation().getLatitude() == location.getLatitude()
        && feature.getLocation().getLongitude() == location.getLongitude()) {
      return feature;
    }
  }

  // No feature was found, return an unnamed feature.
  return Feature.newBuilder().setName("").setLocation(location).build();
}

从其他线程与StreamObserver交互是否有什么注意事项? 例如,比如说 checkFeature() 异步点击另一个服务,返回一个CompletableFuture。

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      thenAccept(feature -> responseObserver.onNext(feature));
  responseObserver.onCompleted();
}

当然,上面的方法是行不通的,因为第一个线程会执行 onCompleted() 才会返回该功能。所以我们来解决这个问题。

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      thenAccept(feature -> {
        responseObserver.onNext(feature);
        responseObserver.onCompleted();
      });
}

我想这应该可以,但我是Java新手 所以我不知道会有什么后果。比如说

  • Context.current() 是一致的吗?
  • 除了会导致StreamObserver破坏或过早关闭外,还有什么会导致 onNext() 对于单调电话和 onError()?
  • 有没有更好的做法?

如果有人也能给我讲讲他们是如何推理的,那就太好了。我试着找了一下实际实现的 StreamObserver 但我不知道该找什么。

java thread-safety grpc completable-future grpc-java
1个回答
1
投票

使用 thenAccept() 呼叫 onNext()onCompleted() 是可以的,因为观察者不是从多个线程中并发调用的。

这个 "坏了 "的例子在调用 onCompleted() 分别被打破,也是因为它可以从多个线程调用观察者,而不需要任何形式的同步。StreamObservers 不能同时从多个线程调用观察者。

使用 thenAccept() 但并不完全正确,因为它没有处理未来失败的情况。所以你需要接收 Throwable 也可以用 whenComplete():

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      whenComplete((feature, t) -> {
        if (t != null) {
          responseObserver.onError(t);
        } else {
          responseObserver.onNext(feature);
          responseObserver.onCompleted();
        }
      });
}

在处理这个lambda时,Context很容易 "出错"。通常情况下,我们会寻找 "架构 "解决方案来确保上下文被传播,比如将所有应用线程池封装在一起。Context.currentContextExecutor() 创建时,所以各个呼叫站点不需要关注传播问题。我还不够熟悉 CompletableFuture 来为其提供策略。


1
投票

Context.current()会一致吗?

Context.current() 是使用 ThreadLocal.如果你在不同的线程上访问它,它就不会是一致的。你可以在不同的线程之间传播上下文。你可能会发现 此职位 有用。

除了一元调用的onNext()和onError()之外,还有什么东西会导致StreamObserver被破坏或提前关闭吗?

是的,StreamObserver的正常流程以onError或onCompleted结束。

StreamObserver javadoc 指出:"由于单个StreamObservers不是线程安全的,所以如果多个线程将并发地写入一个StreamObserver,应用程序必须同步调用"。如果你在调用StreamObserver 同时,你需要同步调用。换句话说,如果你知道即使你使用多个线程,也不会并发调用它,那应该没问题。

如果在多个线程上访问同一个StreamObserver,我会尝试同步它,除非性能很关键,因为它很容易出错。至少,值得好好评论一下。

© www.soinside.com 2019 - 2024. All rights reserved.