使用 Flux / Executor 服务并发执行一组操作

问题描述 投票:0回答:1

我有一个场景来处理客户号码列表并更新数据库中的详细信息

public void updateCustomerDetailsInDB(List<String> numberList)
for(String number : numberList){
    if(!isExistBy(number)) //Check the details of the customer in db by number
    {
    // if not exist call the customer retrieve API to get the customer details
    String response = customerClient.getCustomerRetrieveApi(number); // api call 
    dbService.persistCustomerDetails(response) // save customer details in db 
    }
}
// My CustomerRetrieveApi call using webclient 

       webClient.post()
                .uri(urlString)
                .contentType(mediaType)
                .bodyValue(requestObject)
                .retrieve()
                .bodyToMono(responseClass)
                .onErrorResume(WebClientResponseException.class,
                        ex -> ex.getRawStatusCode() == 404 ? Mono.empty() : Mono.error(ex))
                .block();

列表中将有 1000 个号码。如何使用执行器和通量并行执行它?哪种方法与代码实现更好?

我尝试使用@async方法和执行器,但它仅在单线程中运行。我需要至少 10 个线程(10 个数字)并行运行。

示例:在 for 循环中,1 个数字需要 1 秒才能完成该过程,因此 1000 个数字意味着 1000 秒,但我希望当批次为 10 时程序在 100 秒内执行。

spring-boot asynchronous spring-webflux project-reactor threadpoolexecutor
1个回答
0
投票

为了让 Reactor 优化任务调度,你必须避免诉诸

block
操作。一般来说,你应该尽量避免阻塞,因为Reactor的主要兴趣之一是提供“非阻塞”管道(提供阻塞是为了与外部代码/api兼容)。

就您而言,您可以:

  1. 使
    getCustomerRetrieveApi()
    返回 Mono,而不是阻塞
    public Mono<ResponseClass> getCustomerRetrieveApi() {
       webClient.post()
                .uri(urlString)
                .contentType(mediaType)
                .bodyValue(requestObject)
                .retrieve()
                .bodyToMono(responseClass)
                .onErrorResume(WebClientResponseException.class,
                        ex -> ex.getRawStatusCode() == 404 ? Mono.empty() : Mono.error(ex));
    
    }
    
  2. 使用
    Flux.fromIterable(numberList)
    flatMap
    使主循环具有反应性:
    /**
     * @return count of updated details.
     */
    public Mono<Long> updateCustomerDetailsInDB(List<String> numberList) {
        return Flux.fromIterable(numberList)
            .flatMap(number -> {
                if (isExistsBy(number)) return Mono.just(0L);
                return customerClient.getCustomerRetrieveApi(number)
                              .map(response -> { 
                                      dbService.persistCustomerDetails(response);
                                      return 1L;
                                   })
                              .defaultIfEmpty(0L);
            })
            .sum();
    }
    

现在这可能还不够,具体取决于

isExistBy
persistCustomDetails
的实现。但这是让 Reactor 将工作流程正确转换为非阻塞并发管道的第一个必要步骤。

© www.soinside.com 2019 - 2024. All rights reserved.