使用 Spring WebClient 的内存泄漏

问题描述 投票:0回答:2

我想遍历域名列表(数百万条记录),发送请求并接收响应,以确定它是否还活着。

我选择了一种反应式方法,我希望它能为大量主机提供只有几个线程的服务,但我注意到我的堆内存在不断增长,直到达到 OutOfMemory。

这是我的代码:

@Slf4j
@Component
@RequiredArgsConstructor
public static class DataLoader implements CommandLineRunner {

    private final ReactiveDomainNameRepository reactiveDomainNameRepository;

    @Override
    @SneakyThrows
    public void run(String... strings) {
        ReactorClientHttpConnector connector = getConnector(); // Trying to reuse connector instead of creating new each time

        reactiveDomainNameRepository.findAllByResourcesIsNull() // Flux<DomainEntity>. This basically streams data from MongoDB using reactive driver
                .publishOn(Schedulers.parallel())
                .flatMap(domain -> performRequest(connector, domain)) // If I remove this line everything starts working just fine
                .buffer(1000) // Little optimization. The problem with memory remains even if I don't use buffering.
                .flatMap(reactiveDomainNameRepository::saveAll)
                .subscribe();
    }

    private Mono<DomainEntity> performRequest(ReactorClientHttpConnector connector, DomainEntity domain) {
        return WebClient
                .builder()
                .clientConnector(connector)
                .baseUrl("http://" + domain.getHost())
                .build()
                .get()
                .exchange()
                .onErrorResume(error -> {
                    log.error("Error while requesting '{}': ", domain.getHost());

                    return Mono.empty();
                }) // Mono<ClientResponse>
                .flatMap(resp -> {

                    if (resp.statusCode() == OK) {
                        log.info("Host '{}' is ok", domain.getHost());
                    } else {
                        log.info("Host '{}' returned '{}' status code", domain.getHost(), resp.statusCode().value());
                    }

                    // Consuming response as described in Spring documentation. Tried also resp.toEntity(String.class) but got the same result
                    return resp.toEntity(Void.class)
                            .map(nothing -> domain);
                });
    }
}

这里是堆内存使用情况。不要注意 5:59 - 6:05 期间 - 这是应用程序停止处理数据的时间,因为我没有处理极端情况。通常,它只会不断增长,直到达到内存限制。

所以我基本上有两个问题:

  1. 我的代码有什么问题?
  2. 使用反应式方法向不同主机发出大量请求是个好主意吗?
out-of-memory rx-java reactive-programming rx-java2 spring-webflux
2个回答
3
投票

只需使用

retrieve()
而不是
exchange()
,您甚至不需要混乱的错误处理。

我知道这是一个迟到的回复,但我刚才遇到了完全相同的问题并遇到了你的问题,我只是想在这里留下这个可能的解决方案。 :)

并回答您的问题:

  1. 使用 exchange() 时,您负责处理连接和错误处理,不是很推荐,只有在您确实需要控制时才应使用。

  2. 好吧,你正在使用并行构建,那为什么不呢。


0
投票

我是反应式新手,这个问题很老,但希望对某人有所帮助。

正如您在代码示例中注意到的那样:

flatMap(domain -> performRequest(connector, domain))

是导致问题的原因。这是因为

flatMap
将创建多个 Web 请求并热切地订阅它们,这意味着如果您的 flux 中有 100 个元素,将尝试同时发生 100 个 Web 请求(Spring 会做一些限制,但它在这里不太相关) ,并且流程中的每个下游元素也会急切出现,并且在处理每个元素时,它仍然有对它的引用,阻止 GC 清理,因此内存问题。

https://medium.com/swlh/understanding-reactors-flatmap-operator-a6a7e62d3e95

另一种选择是

concatMap
它的行为类似于
flatMap
但是它并不急切,这意味着它会等待
performRequest
Mono在发出下一个请求之前完成,但是这会慢得多,但使用更少的内存.

Project Reactor 中的 flatMap、flatMapSequential 和 concatMap 有什么区别?

最后,为了提高性能,您可以使用

buffer
方法,您也使用该方法将请求分批处理成不会导致内存问题的块。

reactiveDomainNameRepository.findAllByResourcesIsNull()
        .publishOn(Schedulers.parallel()) // Not sure how this will impact, may need to move to after the Flux.fromIterable
        .buffer(1000) //Adjust value based on your requirements. Buffer into groups before making request
        .concatMap(domains -> { // Will wait for the batch to complete before calling for the next.
            return Flux.fromIterable(domains)
                    .flatMap(domain -> performRequest(connector, domain)) //Make all the requests and save the result in batches
                    .flatMap(reactiveDomainNameRepository::saveAll);
        })
        .subscribe()
© www.soinside.com 2019 - 2024. All rights reserved.