在CompletableFuture上同步使用acceptEither进行任务调度

问题描述 投票:1回答:1

我正在通过CompletableFuture API学习并发性。假设我有两项任务:一项耗时250毫秒,另一项耗时2500毫秒。在以下代码中:

        Supplier<List<Long>> supplyIds = () -> {
            sleep(200);
            return(Arrays.asList(1L, 2L, 3L));
        };

        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
            sleep(250);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () ->  idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
            sleep(2500);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Consumer<List<User>> displayer = users -> {
            users.forEach(System.out::println);
        };

        CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
        CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
        CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);

        users1.thenRun(()-> System.out.println("User 1"));
        users2.thenRun(()-> System.out.println("User 2"));
        users1.acceptEither(users2, displayer);

        sleep(6000);

我得到以下结果:

User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1

我知道代码正在同步运行,因为使用了相同的公共fork联接池线程,我们没有指定该线程。我对为什么先执行fetchUsers2任务然后执行fetchUsers1任务感到困惑(这似乎与每次运行都一致)。我假设由于在代码中首先在thenCompose上调用了fetchUsers1,因此将首先对其进行“排队”。

java concurrency completable-future completion-stage
1个回答
2
投票

文档中没有任何内容表明thenCompose的调用顺序很重要。

由于您定义了两个independent阶段,它们都仅取决于completableFuture,所以users1user2之间没有定义的顺序,并且所得的顺序仅取决于实现。

您可以在一个环境中可重复获得特定的订单,但在不同的环境中获得不同的订单。即使在您的环境中,也有可能在某些运行中获得不同的订单。如果启动线程在调用supplyAsync(supplyIds) 200毫秒后丢失了CPU,则它可能在调用thenCompose(fetchUsers1)之前立即执行用thenCompose(fetchUsers2)指定的操作。

当两个动作之间的顺序很重要时,您必须对它们之间的依赖性进行建模。

注意,同样的代码

users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);

定义完全独立的动作。由于acceptEither应用于users1users2,而不是由thenRun调用返回的完成阶段,因此它不依赖于打印语句的完成。这三个动作可以以任何顺序执行。

© www.soinside.com 2019 - 2024. All rights reserved.