有没有一个好的方法可以将FutureLocal.java添加到自定义的Future.java扩展CompletableFuture中?(示例代码如下)

问题描述 投票:0回答:1

我有下面的代码,除了当我调用super.thenCompose时,它返回一个CompletableFuture,而不是我的Custom Future.java,这是很关键的。 我试图复制twitter的scala futures,即 "未来"。

  1. 能够像twitter scala的期货一样,添加取消链。
  2. 可以让请求上下文流经thenApply和thenCompose链,以修复slf4j中的MDC(很像ThreadLocal,但它是在每个lambda运行之前重新应用的,如下面的代码所示)。

    public class Future extends CompletableFuture {

    @Override
    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
        Map<String, Object> state = FutureLocal.fetchState();
        MyFunction f = new MyFunction(state, fn);       
    
        return super.thenApply(f);
    }
    
    @Override
    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
        Map<String, Object> state = FutureLocal.fetchState();
        MyFunction f = new MyFunction(state, fn);
    
        return super.thenCompose(f);
    }
    
    @SuppressWarnings("hiding")
    private class MyFunction implements Function {
    
        private Map<String, Object> state;
        private Function fn;
    
        public MyFunction(Map<String, Object> state, @SuppressWarnings("rawtypes") Function fn) {
            this.state = state;
            this.fn = fn;
    
        }
    
        @Override
        public Object apply(Object t) {
    
            try {
                FutureLocal.restoreState(state);
    
                return fn.apply(t);
    
            } finally {
                FutureLocal.restoreState(null);
            }
    
    
        }
    
    }
    
    @Override
    public boolean complete(T value) {
        return super.complete(value);
    }
    
    @Override
    public boolean completeExceptionally(Throwable ex) {
        return super.completeExceptionally(ex);
    }
    

    }

下面是我用来运行该代码的一些代码,但是在地图中记录 "测试 "在第3次远程调用时开始失败,这意味着slf4j MDC会崩溃。

public class TestCustomFutures {

    private Executor exec = Executors.newFixedThreadPool(3);

    @Test
    public void testFutureContext() throws InterruptedException, ExecutionException {

        Set<Integer> hashSet = new HashSet<Integer>();

        FutureLocal.put("test", 100);

        CompletableFuture<Integer> f = myRemoteCall(4)
            .thenCompose(s -> myRemoteCall(3))
            .thenCompose(s -> myRemoteCall(2));

        f.get();
    }

    private Future<Integer> myRemoteCall(int i) {
        System.out.println("result="+i+" map="+FutureLocal.get("test")+" thread="+Thread.currentThread().getName());

        Future<Integer> f = new Future<Integer>();

        exec.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    f.completeExceptionally(e);
                }

                f.complete(i);
            }
        });

        return f;
    }
}

然后输出是这样的

result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2

请注意,最后一个值我们不希望是空的

java completable-future
1个回答
0
投票

ahhh, 我错过了一个简单的事情,因为我是在jdk8中。 然而,在jdk11中,你可以覆盖这个... ...

@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
    return new Future<U>();
}

在jdk8中,由于某些原因,这个不能编译,也不能调用这个:(。) 糟了,我还不想升级到11,因为有些用法还在jdk8上:().

© www.soinside.com 2019 - 2024. All rights reserved.