我曾希望在 gRPC Context 库中找到一个简单的类来完成此操作,但遗憾的是没有运气。我想包装任意
ExecutorService
,以便它将包装其任务,以便传播提交它们的线程的当前上下文。
到目前为止我已经:
import io.grpc.Context;
public final class GrpcPropagatingExecutorService extends AbstractExecutorService {
private final ExecutorService delegate;
public GrpcPropagatingExecutorService(ExecutorService delegate) {
this.delegate = checkNotNull(delegate);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// Option 1: I could put code here...
super.newTaskFor(Context.current().wrap(runnable), value);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// ... and here.
super.newTaskFor(Context.current().wrap(callable));
}
@Override
public void execute(Runnable command) {
// Option 2: Or I could put code here.
super.execute(Context.current().wrap(command));
}
// Other methods are trivial...
}
我的问题是: 哪种方法是最好的方法,或者它们是等效的吗?
将包装代码放在
newTaskFor()
中对我来说感觉更好,但留下了一个问题(即,如果有人将服务用作普通 Executor
并直接调用 execute()
,这不会包装给定的 Runnable
,该怎么办? )
仅将包装代码放在
execute()
中可能没问题,但前提是始终在调度任务的线程中调用它,而不是在某些工作线程中调用(这在文档中根本不清楚,理论上我认为它包装此服务的服务可能会违反)。
将包装代码放入两个(腰带和大括号)中是可能的,但我需要一种新类型,以便我可以检测到已经包装的
Runnables
(而且我认为即使这样也不是特别健壮)。
那么我是否只需将东西包装在两个地方并忍受将几乎所有任务包装两次,或者我是否遗漏了一些东西?
编辑-我刚刚想到的另一个想法是:
private static final class GrpcPropagatingTask<V> extends FutureTask<V> {
public GrpcPropagatingTask(Callable<V> callable) {
super(Context.current().wrap(callable));
}
public GrpcPropagatingTask(Runnable runnable, V result) {
super(Context.current().wrap(runnable), result);
}
}
并从
newTaskFor()
方法返回它。然后在execute()
我可以做:
@Override
public void execute(Runnable command) {
if (!(command instanceof GrpcPropagatingTask)) {
command = Context.current().wrap(command);
}
delegate.execute(command);
}
我认为这可以解决我对
execute()
被直接打电话的恐惧。
到目前为止,我能想到的最佳答案是问题中的替代建议。它似乎有效,但我看不出为什么它是错误的明显方法:
FutureTask
的子类,并让它在构造函数中包装可调用/可运行(重要的是在此处包装,而不是在调用时包装)。 private static final class GrpcPropagatingTask<V> extends FutureTask<V> {
public GrpcPropagatingTask(Callable<V> callable) {
super(Context.current().wrap(callable));
}
public GrpcPropagatingTask(Runnable runnable, V result) {
super(Context.current().wrap(runnable), result);
}
}
AbstractExecutorService
创建 delegate
的子类,并重写 newTaskFor()
方法: private final ExecutorService delegate;
private GrpcPropagatingExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new GrpcPropagatingTask<>(runnable, value);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new GrpcPropagatingTask<>(callable);
}
execute()
有条件地包装 Runnable
实例: @Override
public void execute(Runnable command) {
if (!(command instanceof GrpcPropagatingTask)) {
command = Context.current().wrap(command);
}
delegate.execute(command);
}
AbstractExecutorService
的其他方法来轻松调用委托实例。现在应该是这样:
ExecutorService
方法的任何可运行对象/可调用对象都将使用调用线程的 gRPC 上下文进行包装。Executor
也可以。