如何通过使用ExecutorService和CompletableFuture控制线程号,立即将响应返回给弹簧通量客户端?

问题描述 投票:0回答:1

我需要通过基于Spring的Rest Service API的非阻塞io并行调用两个下游系统。但是第一个下游系统容量是一次10个请求,第二个下游系统容量是100个。

第一个下游系统输出到第二个下游系统,因此我可以向第二个系统提出更并行的请求,以加快处理速度。

第二个下游系统响应非常大,因此无法将所有响应具体存储在内存中,因此立即想将响应返回给客户端。

前工作流程:

enter image description here

示例代码:

@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getstream() {

    ExecutorService executor = Executors.newFixedThreadPool(10);

    List<CompletableFuture> list = new ArrayList<>();

    AtomicInteger ai = new AtomicInteger(1);
    RestTemplate restTemplate = new RestTemplate();

    for (int i = 0; i < 100; i++) {
        CompletableFuture<Object> cff = CompletableFuture.supplyAsync(

                () -> ai.getAndAdd(1) + " first downstream web service " +
                        restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get(), String.class)

        ).thenApplyAsync(v -> {

            Random r = new Random();
            Integer in = r.nextInt(1000);

            return v + " second downstream web service  " + in + " " + restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get() + 1, String.class) + " \n";
        }, executor);

        list.add(cff);
    }

    return Flux.fromStream(list.stream().map(m -> {
                try {
                    return m.get().toString();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return "";
            })
    );

}

此代码仅在我得到响应后才在前五个线程中起作用,所有线程均已完成该过程。但是,一旦我从第二个下游系统获得响应,就需要立即对客户端做出响应。

注意:上面的代码未使用二级线程池实现。

谢谢你。

spring-boot java-stream spring-webflux executorservice completable-future
1个回答
0
投票

如果您使用Spring-Webflux构建非阻塞系统,最好在示例中利用WebClient的功能。我创建了一个简单的测试应用程序,下面的代码段对我有用:

private final WebClient w = WebClient.create("http://localhost:8080/call"); // web client for external system


@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyClass> getstream() {
    return Flux
            .range(0, 100) // prepare initial 100 requests
            .window(10) // combine elements in batch of 10 (probably buffer will fit better, have a look)

            // .delayElements(Duration.ofSeconds(5)) for testing purpose you can use this function as well
            .doOnNext(flow -> log.info("Batch of 10 is ready")) // double check tells that batch is ready

            .flatMap(flow -> flow
                    // perform an external async call for each element in batch of 10
                    // they will be executed sequentially but there will not be any performance issues because
                    // calls are async. If you wish you can add .parallel() to the flow to make it parallel
                    .flatMap(element -> w.get().exchange())
                    .map(r -> r.bodyToMono(MyClass.class))
            )

            // subscribe to each response and throw received element further to the stream
            .flatMap(response -> Mono.create(s -> response.subscribe(s::success)))

            .window(1000) // batch of 1000 is ready
            .flatMap(flow -> flow
                    .flatMap(element -> w.get().exchange())
                    .map(r -> r.bodyToMono(MyClass.class))
            )
            .flatMap(response -> Mono.create(s -> response.subscribe(s::success)));
}

public static class MyClass {
    public Integer i;
}
© www.soinside.com 2019 - 2024. All rights reserved.