如何在使用 Spring boot webclient 调用第三方 api 时阻止() Reactor Http 线程?

问题描述 投票:0回答:1
java spring-boot spring-webflux spring-webclient
1个回答
0
投票

Reactor 是一个非阻塞库,所以在 Spring http 线程池中阻塞它的事件是不正确的用法(PS 但一般情况下可能会有一些偶尔需要的情况,reactor api 非常有用);

简单来说,可以新建一个业务进程线程池来提交非阻塞的api请求;这是一些代码片段供参考。


@Component
public class MyApiClient {

    private final WebClient webClient;

    public MyApiClient(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder.build();
    }

    public <R, P> R processRequest(String url, Class<R> type, HttpEntity<P> requestEntity, HttpMethod method) {
        HttpHeaders httpHeaders = requestEntity.getHeaders();
        Mono<R> result = getResult(method, url, httpHeaders, type);
        return blockMono(result);
    }

    private <R> Mono<R> getResult(HttpMethod method, String url, HttpHeaders httpHeaders, Class<R> type) {
        return webClient.method(method)
                .uri(url)
                .accept(MediaType.ALL)
                .contentType(MediaType.APPLICATION_JSON)
                .headers(headers -> headers.putAll(httpHeaders))
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError,
                        clientResponse -> Mono.error(new RuntimeException("Client error")))
                .onStatus(HttpStatus::is5xxServerError,
                        clientResponse -> Mono.error(new RuntimeException("Server error")))
                .bodyToMono(type);
    }

    private <R> R blockMono(Mono<R> mono) {
// just init a thread pool , any way you favorite is ok 
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            return executor.submit((Callable<R>) mono::block).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Error occurred while blocking Mono", e);
        } finally {
            executor.shutdown();
        }
    }
}

您处于 Spring 环境中,因此有必要将业务流程线程池初始化为连接到 Spring 的 bean,如下所示。


   @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("Bussiness-Pool-");
        executor.initialize();
        return executor;
    }

注释 bean 如下或构建默认构造方法以自动装配

  @Resource
    ThreadPoolTaskExecutor executor;
© www.soinside.com 2019 - 2024. All rights reserved.