如何限制Spring webflux(netty webserver)中可以处理的请求数量

问题描述 投票:0回答:1

我想要相当于在 tomcat 中设置此属性的东西。这会将线程池中的线程数设置为 100,因此可以并发处理的最大请求数为 100 个请求。

server.tomcat.maxthread=100

但是在webflux(netty webserver)中我找不到限制请求的配置属性。

这是我到目前为止所尝试过的

1 - 为

ReactiveWebServerFactory
定义自定义 bean。我创建了 2 个 API 端点来测试,第一个端点是阻塞 IO,第二个不是,但它一次只能处理一个请求,另一个请求必须等到第一个请求完成,尽管它应该运行在差异线。

@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
        NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
        factory.addServerCustomizers(builder -> builder.runOn(new NioEventLoopGroup(2)));

        return factory;
}

2 - 设置我在文档中找到的这个属性,但它仍然可以在不同线程上处理两个以上的请求。 (我测试的方法和上面一样)

server.netty.max-keep-alive-requests=2

3 - 将tomcat maxthread设置为2,这个没有达到我预期的效果。

server.tomcat.maxthread=2

那么webflux中是否可以限制最大请求数。

我的测试 API

@RestController
public class CheckController {

    @PostMapping("/test")
    public Mono<Long> something() throws InterruptedException {
        System.out.println("cores " + Runtime.getRuntime().availableProcessors());
        return Mono.just(2L)
                .flatMap(d -> simulateBlockingOperation())
                .log();
    }

    @PostMapping("/test2")
    public Mono<Long> something2() throws InterruptedException {
        return Mono.just(1L)
                .log();
    }

    private Mono<Long> simulateBlockingOperation() {
        System.out.println("Current thread " + Thread.currentThread().getName());
        int x = 0;
        while(x!=1) {}
        return Mono.just(2L);
    }
}

更新:我通过创建另一个线程池并使用 .subscribeOn 来切换线程组并限制该线程组上的线程数量来临时解决该问题。但我觉得这种做法真的很肮脏。

@RestController
public class CheckController {

    private final Scheduler customThreadPool;

    public CheckController(Scheduler customThreadPool) {
        this.customThreadPool = customThreadPool;
    }
   @Bean
   public Scheduler reactiveRequestThreadPool() {
       return Schedulers.newBoundedElastic(2, 2, "my-custom-thread");
   }

    @PostMapping("/test")
    public Mono<Long> something() throws InterruptedException {
        return Mono.just(2L)
                .doOnNext(d -> getCurrentThread())
                .flatMap(d -> simulateBlockingOperation())
                .subscribeOn(customThreadPool)
                .log();
    }

    @PostMapping("/test2")
    public Mono<Long> something2() throws InterruptedException {
        return Mono.just(1L)
                .doOnNext(d -> getCurrentThread())
                .subscribeOn(customThreadPool)
                .log();
    }

    private Mono<Long> simulateBlockingOperation() {
        int x = 0;
        while(x!=1) {}
        return Mono.just(2L);
    }

    private void getCurrentThread() {
        System.out.println("current thread " + Thread.currentThread().getName());
    }
}
spring spring-boot spring-webflux netty
1个回答
0
投票

这是给你的一个主意。创建一个过滤器,涵盖服务器提供的所有 API。在该过滤器中,有一个当前请求的静态原子计数器。每次收到请求时,请检查计数器,如果它不大于您的限制,则允许该请求并增加计数器。如果大于您的限制,则拒绝该请求。每次请求完成并且响应通过过滤器返回时 - 递减计数器。

© www.soinside.com 2019 - 2024. All rights reserved.