我有下面的代码,除了当我调用super.thenCompose时,它返回一个CompletableFuture,而不是我的Custom Future.java,这是很关键的。 我试图复制twitter的scala futures,即 "未来"。
可以让请求上下文流经thenApply和thenCompose链,以修复slf4j中的MDC(很像ThreadLocal,但它是在每个lambda运行之前重新应用的,如下面的代码所示)。
public class Future extends CompletableFuture {
@Override
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
Map<String, Object> state = FutureLocal.fetchState();
MyFunction f = new MyFunction(state, fn);
return super.thenApply(f);
}
@Override
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
Map<String, Object> state = FutureLocal.fetchState();
MyFunction f = new MyFunction(state, fn);
return super.thenCompose(f);
}
@SuppressWarnings("hiding")
private class MyFunction implements Function {
private Map<String, Object> state;
private Function fn;
public MyFunction(Map<String, Object> state, @SuppressWarnings("rawtypes") Function fn) {
this.state = state;
this.fn = fn;
}
@Override
public Object apply(Object t) {
try {
FutureLocal.restoreState(state);
return fn.apply(t);
} finally {
FutureLocal.restoreState(null);
}
}
}
@Override
public boolean complete(T value) {
return super.complete(value);
}
@Override
public boolean completeExceptionally(Throwable ex) {
return super.completeExceptionally(ex);
}
}
下面是我用来运行该代码的一些代码,但是在地图中记录 "测试 "在第3次远程调用时开始失败,这意味着slf4j MDC会崩溃。
public class TestCustomFutures {
private Executor exec = Executors.newFixedThreadPool(3);
@Test
public void testFutureContext() throws InterruptedException, ExecutionException {
Set<Integer> hashSet = new HashSet<Integer>();
FutureLocal.put("test", 100);
CompletableFuture<Integer> f = myRemoteCall(4)
.thenCompose(s -> myRemoteCall(3))
.thenCompose(s -> myRemoteCall(2));
f.get();
}
private Future<Integer> myRemoteCall(int i) {
System.out.println("result="+i+" map="+FutureLocal.get("test")+" thread="+Thread.currentThread().getName());
Future<Integer> f = new Future<Integer>();
exec.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
f.completeExceptionally(e);
}
f.complete(i);
}
});
return f;
}
}
然后输出是这样的
result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2
请注意,最后一个值我们不希望是空的
ahhh, 我错过了一个简单的事情,因为我是在jdk8中。 然而,在jdk11中,你可以覆盖这个... ...
@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
return new Future<U>();
}
在jdk8中,由于某些原因,这个不能编译,也不能调用这个:(。) 糟了,我还不想升级到11,因为有些用法还在jdk8上:().