采用Guava的AbstractExecutorService实现(gRPC)上下文传播的最佳方式?

问题描述 投票:0回答:1

我曾希望在 gRPC Context 库中找到一个简单的类来完成此操作,但遗憾的是没有运气。我想包装任意

ExecutorService
,以便它将包装其任务,以便传播提交它们的线程的当前上下文。

到目前为止我已经:

import io.grpc.Context;

public final class GrpcPropagatingExecutorService extends AbstractExecutorService {
  private final ExecutorService delegate;

  public GrpcPropagatingExecutorService(ExecutorService delegate) {
    this.delegate = checkNotNull(delegate);
  }

  @Override
  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    // Option 1: I could put code here...
    super.newTaskFor(Context.current().wrap(runnable), value);
  }

  @Override
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    // ... and here.
    super.newTaskFor(Context.current().wrap(callable));
  }

  @Override
  public void execute(Runnable command) {
    // Option 2: Or I could put code here.
    super.execute(Context.current().wrap(command));    
  }

  // Other methods are trivial...
}

我的问题是: 哪种方法是最好的方法,或者它们是等效的吗?

将包装代码放在

newTaskFor()
中对我来说感觉更好,但留下了一个问题(即,如果有人将服务用作普通
Executor
并直接调用
execute()
,这不会包装给定的
Runnable
,该怎么办? )

仅将包装代码放在

execute()
中可能没问题,但前提是始终在调度任务的线程中调用它,而不是在某些工作线程中调用(这在文档中根本不清楚,理论上我认为它包装此服务的服务可能会违反)。

将包装代码放入两个(腰带和大括号)中是可能的,但我需要一种新类型,以便我可以检测到已经包装的

Runnables
(而且我认为即使这样也不是特别健壮)。

那么我是否只需将东西包装在两个地方并忍受将几乎所有任务包装两次,或者我是否遗漏了一些东西?

编辑-我刚刚想到的另一个想法是:

  private static final class GrpcPropagatingTask<V> extends FutureTask<V> {
    public GrpcPropagatingTask(Callable<V> callable) {
      super(Context.current().wrap(callable));
    }

    public GrpcPropagatingTask(Runnable runnable, V result) {
      super(Context.current().wrap(runnable), result);
    }
  }

并从

newTaskFor()
方法返回它。然后在
execute()
我可以做:

  @Override
  public void execute(Runnable command) {
    if (!(command instanceof GrpcPropagatingTask)) {
      command = Context.current().wrap(command);
    }
    delegate.execute(command);
  }

我认为这可以解决我对

execute()
被直接打电话的恐惧。

java grpc guava
1个回答
0
投票

到目前为止,我能想到的最佳答案是问题中的替代建议。它似乎有效,但我看不出为什么它是错误的明显方法:

  1. 创建
    FutureTask
    的子类,并让它在构造函数中包装可调用/可运行(重要的是在此处包装,而不是在调用时包装)。
  private static final class GrpcPropagatingTask<V> extends FutureTask<V> {
    public GrpcPropagatingTask(Callable<V> callable) {
      super(Context.current().wrap(callable));
    }

    public GrpcPropagatingTask(Runnable runnable, V result) {
      super(Context.current().wrap(runnable), result);
    }
  }
  1. 然后使用
    AbstractExecutorService
    创建
    delegate
    的子类,并重写
    newTaskFor()
    方法:
  private final ExecutorService delegate;

  private GrpcPropagatingExecutorService(ExecutorService delegate) {
    this.delegate = delegate;
  }

  @Override
  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new GrpcPropagatingTask<>(runnable, value);
  }

  @Override
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new GrpcPropagatingTask<>(callable);
  }
  1. 然后实现
    execute()
    有条件地包装
    Runnable
    实例:
  @Override
  public void execute(Runnable command) {
    if (!(command instanceof GrpcPropagatingTask)) {
      command = Context.current().wrap(command);
    }
    delegate.execute(command);
  }
  1. 实现
    AbstractExecutorService
    的其他方法来轻松调用委托实例。

现在应该是这样:

  1. 传递给任何
    ExecutorService
    方法的任何可运行对象/可调用对象都将使用调用线程的 gRPC 上下文进行包装。
  2. 即使将该服务用作普通的
    Executor
    也可以。
  3. 任务不会被打包多次。
© www.soinside.com 2019 - 2024. All rights reserved.