CompletableFutures:并行处理CompletableFuture链

问题描述 投票:0回答:1

我正在设计与CompletableFutures的异步调用。这是批量调用,我需要一次处理多个实体。在通话结束时,我必须收集有关每个项目处理状态的信息。

作为输入,我有一组这些实体的ID。这是一个复杂的实体,我必须放置几个DAO调用才能将实体编译成一个对象。每个DAO方法都返回CompletableFuture<PartX>

我正在链接那些DAO调用,因为如果其中一个部分不存在,我将无法构造整个对象。以下是我的代码段的样子:

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

public class CfChainsAllOfTest {

    private DAO dao = new DAO();

    public static void main(String[] args) {
        CompletableFuture<Void> resultPrintingCf = new CfChainsAllOfTest().fetchAllInParallelAndCollect(Lists.newArrayList(1l, 2l, 3l)).thenAccept(results -> {
            System.out.println("[" + Thread.currentThread().getName() + "]" + results);
        });
        resultPrintingCf.join();
    }

    private CompletableFuture<List<Item>> fetchAllInParallelAndCollect(List<Long> ids) {
        List<CompletableFuture<Item>> cfs = Lists.newArrayList();
        for (Long id : ids) {
            // I want this to be an instant non-blocking operation
            cfs.add(fetchSingle(id));
            System.out.println("[" + Thread.currentThread().getName() + "]" + "After completable future was added to the list, id=" + id);
        }
        return waitAllOfAndCollect(cfs);
    }

    private CompletableFuture<Item> fetchSingle(Long id) {
        return getPartCAndSetOnItem(new Item(id)).thenCompose(this::getPartBAndSetOnItem).thenCompose(this::getPartAAndSetOnItem);
    }

    public CompletableFuture<Item> getPartCAndSetOnItem(Item item) {
        return dao.getPartC(item.getId()).thenCompose(partC -> {
            CompletableFuture<Item> cf = new CompletableFuture<>();
            item.setPartC(partC);
            cf.complete(item);
            return cf;
        });
    }

    public CompletableFuture<Item> getPartBAndSetOnItem(Item item) {
        return dao.getPartB(item.getId()).thenCompose(partB -> {
            CompletableFuture<Item> cf = new CompletableFuture<>();
            item.setPartB(partB);
            cf.complete(item);
            return cf;
        });
    }

    public CompletableFuture<Item> getPartAAndSetOnItem(Item item) {
        return dao.getPartA(item.getId()).thenCompose(partA -> {
            CompletableFuture<Item> cf = new CompletableFuture<>();
            item.setPartA(partA);
            cf.complete(item);
            return cf;
        });
    }

    private static <T> CompletableFuture<List<T>> waitAllOfAndCollect(List<CompletableFuture<T>> futures) {
        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return allDoneFuture.thenApply(v -> futures.stream().map(future -> future.join()).collect(Collectors.<T> toList()));
    }

    static class DAO {

        public CompletableFuture<PartC> getPartC(Long id) {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part C from database for id=" + id);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Part C fetched from db for id=" + id);
                return new PartC();
            });
        }

        public CompletableFuture<PartB> getPartB(Long id) {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part B from database for id=" + id);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Part B fetched from db for id=" + id);
                return new PartB();
            });
        }

        public CompletableFuture<PartA> getPartA(Long id) {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part A from database for id=" + id);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Part A fetched from db  for id=" + id);
                return new PartA();
            });
        }

    }

    static class Item {

        private final Long id;

        private PartA partA;
        private PartB partB;
        private PartC partC;

        public Item(Long id) {
            this.id = id;
        }

        public Long getId() {
            return id;
        }

        public PartA getPartA() {
            return partA;
        }

        public void setPartA(PartA partA) {
            this.partA = partA;
        }

        public PartB getPartB() {
            return partB;
        }

        public void setPartB(PartB partB) {
            this.partB = partB;
        }

        public PartC getPartC() {
            return partC;
        }

        public void setPartC(PartC partC) {
            this.partC = partC;
        }

        @Override
        public String toString() {
            return "Item [id=" + id + ", partA=" + partA + ", partB=" + partB + ", partC=" + partC + "]";
        }

    }

    static class PartA {
        @Override
        public String toString() {
            return "Part A";
        }

    }

    static class PartB {
        @Override
        public String toString() {
            return "Part B";
        }
    }

    static class PartC {
        @Override
        public String toString() {
            return "Part C";
        }
    }

}

问题是由于链接,每个项目的处理并不是真正并行完成的。看起来像CompletableFutures的链接是一个阻塞电话。我希望CF链可以立即返回CompletableFuture<Whole>的变量,然后才开始计算该值。

那就是说,实现这种行为的最佳方式是什么?谢谢。

java asynchronous completable-future
1个回答
0
投票

问题在于这种方法:

private CompletableFuture<Item> fetchSingle(Long id) {
    return getPartCAndSetOnItem(new Item(id)).thenCompose(this::getPartBAndSetOnItem).thenCompose(this::getPartAAndSetOnItem);
}

基本上你说的是:得到C部分,然后得到B部分,然后得到A部分。

相反,你应该调用3个方法,然后合并结果 - 虽然这可能不是必需的,因为你只需将结果存储在Item上(注意这里的Java内存模型,因为你的Item不同步:对于更复杂的例子,它可能无法正常工作)。

所以,基本上:

private CompletableFuture<Item> fetchSingle(Long id) {
    Item result = new Item(id);
    CompletableFuture<?> c = getPartCAndSetOnItem(result);
    CompletableFuture<?> b = getPartBAndSetOnItem(result);
    CompletableFuture<?> a = getPartAAndSetOnItem(result);
    return CompletableFuture.allOf(a, b, c).thenApply(__ -> result);
}

当然,缺点是即使一个人失败也会执行所有3个电话,但是你不能吃蛋糕并且吃掉它...

作为旁注,您的getPartXAndSetOnItem()方法可以简化为

public CompletableFuture<Item> getPartXAndSetOnItem(Item item) {
    return dao.getPartX(item.getId()).thenApply(partX -> {
        item.setPartX(partX);
        return item;
    });
}

或者,考虑到我们不关心fetchSingle()中的实际结果类型:

public CompletableFuture<?> getPartXAndSetOnItem(Item item) {
    return dao.getPartX(item.getId()).thenRun(item::setPartX);
}
© www.soinside.com 2019 - 2024. All rights reserved.