Reactor 是一个非阻塞库,所以在 Spring http 线程池中阻塞它的事件是不正确的用法(PS 但一般情况下可能会有一些偶尔需要的情况,reactor api 非常有用);
简单来说,可以新建一个业务进程线程池来提交非阻塞的api请求;这是一些代码片段供参考。
@Component
public class MyApiClient {
private final WebClient webClient;
public MyApiClient(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.build();
}
public <R, P> R processRequest(String url, Class<R> type, HttpEntity<P> requestEntity, HttpMethod method) {
HttpHeaders httpHeaders = requestEntity.getHeaders();
Mono<R> result = getResult(method, url, httpHeaders, type);
return blockMono(result);
}
private <R> Mono<R> getResult(HttpMethod method, String url, HttpHeaders httpHeaders, Class<R> type) {
return webClient.method(method)
.uri(url)
.accept(MediaType.ALL)
.contentType(MediaType.APPLICATION_JSON)
.headers(headers -> headers.putAll(httpHeaders))
.retrieve()
.onStatus(HttpStatus::is4xxClientError,
clientResponse -> Mono.error(new RuntimeException("Client error")))
.onStatus(HttpStatus::is5xxServerError,
clientResponse -> Mono.error(new RuntimeException("Server error")))
.bodyToMono(type);
}
private <R> R blockMono(Mono<R> mono) {
// just init a thread pool , any way you favorite is ok
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
return executor.submit((Callable<R>) mono::block).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Error occurred while blocking Mono", e);
} finally {
executor.shutdown();
}
}
}
您处于 Spring 环境中,因此有必要将业务流程线程池初始化为连接到 Spring 的 bean,如下所示。
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Bussiness-Pool-");
executor.initialize();
return executor;
}
注释 bean 如下或构建默认构造方法以自动装配
@Resource
ThreadPoolTaskExecutor executor;