我想要相当于在 tomcat 中设置此属性的东西。这会将线程池中的线程数设置为 100,因此可以并发处理的最大请求数为 100 个请求。
server.tomcat.maxthread=100
但是在webflux(netty webserver)中我找不到限制请求的配置属性。
这是我到目前为止所尝试过的
1 - 为
ReactiveWebServerFactory
定义自定义 bean。我创建了 2 个 API 端点来测试,第一个端点是阻塞 IO,第二个不是,但它一次只能处理一个请求,另一个请求必须等到第一个请求完成,尽管它应该运行在差异线。
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(builder -> builder.runOn(new NioEventLoopGroup(2)));
return factory;
}
2 - 设置我在文档中找到的这个属性,但它仍然可以在不同线程上处理两个以上的请求。 (我测试的方法和上面一样)
server.netty.max-keep-alive-requests=2
3 - 将tomcat maxthread设置为2,这个没有达到我预期的效果。
server.tomcat.maxthread=2
那么webflux中是否可以限制最大请求数。
我的测试 API
@RestController
public class CheckController {
@PostMapping("/test")
public Mono<Long> something() throws InterruptedException {
System.out.println("cores " + Runtime.getRuntime().availableProcessors());
return Mono.just(2L)
.flatMap(d -> simulateBlockingOperation())
.log();
}
@PostMapping("/test2")
public Mono<Long> something2() throws InterruptedException {
return Mono.just(1L)
.log();
}
private Mono<Long> simulateBlockingOperation() {
System.out.println("Current thread " + Thread.currentThread().getName());
int x = 0;
while(x!=1) {}
return Mono.just(2L);
}
}
更新:我通过创建另一个线程池并使用 .subscribeOn 来切换线程组并限制该线程组上的线程数量来临时解决该问题。但我觉得这种做法真的很肮脏。
@RestController
public class CheckController {
private final Scheduler customThreadPool;
public CheckController(Scheduler customThreadPool) {
this.customThreadPool = customThreadPool;
}
@Bean
public Scheduler reactiveRequestThreadPool() {
return Schedulers.newBoundedElastic(2, 2, "my-custom-thread");
}
@PostMapping("/test")
public Mono<Long> something() throws InterruptedException {
return Mono.just(2L)
.doOnNext(d -> getCurrentThread())
.flatMap(d -> simulateBlockingOperation())
.subscribeOn(customThreadPool)
.log();
}
@PostMapping("/test2")
public Mono<Long> something2() throws InterruptedException {
return Mono.just(1L)
.doOnNext(d -> getCurrentThread())
.subscribeOn(customThreadPool)
.log();
}
private Mono<Long> simulateBlockingOperation() {
int x = 0;
while(x!=1) {}
return Mono.just(2L);
}
private void getCurrentThread() {
System.out.println("current thread " + Thread.currentThread().getName());
}
}
这是给你的一个主意。创建一个过滤器,涵盖服务器提供的所有 API。在该过滤器中,有一个当前请求的静态原子计数器。每次收到请求时,请检查计数器,如果它不大于您的限制,则允许该请求并增加计数器。如果大于您的限制,则拒绝该请求。每次请求完成并且响应通过过滤器返回时 - 递减计数器。