我如何才能等到所有物品都被消耗完

问题描述 投票:0回答:1

我的任务很简单,我需要用反应式方式查询elasticsearch滚动索引。

由于@Document不支持Spring EL的索引名称,如@Document(index = "indexName-#(new Date().format(yyyy-MM-dd))")

所以我尝试使用 ReactiveElasticsearchTemplate 调用 elasticsearch,它支持我在运行时更改索引名称。

但是由于数据量大于10000,所以我需要使用scroll来重复查询,直到获取所有数据。

我已经完成了第一个查询和滚动查询,它可以返回值。

但是我需要合并所有结果然后返回。

我怎样才能做到这一点?目前,当消费者仍在工作时,空结果已返回到前端。我怎样才能要求线程等待,直到消费者完成elasticsearch返回所有数据?谢谢。

public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to)
  throws Exception {
List<ELKModel> result = new ArrayList<ELKModel>();
List<Long> total = new ArrayList<>();
List<Long> currentSize = new ArrayList<>();
List<String> scrollId = new ArrayList<>();

NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
    QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
 elasticsearchSupport
    .scrollStart(query, ELKModel.class)
    .map(ELKModelWrapper::valueFrom).
        subscribe(
        wrapper -> {
          total.add(wrapper.getTotal());l
          currentSize.add(wrapper.getCurrentSize());
          result.addAll(wrapper.getResults());
          scrollId.add(wrapper.getScrollId());
        }).dispose();

while (currentSize.size() == 1 && total.size() == 1 && currentSize.get(0) < total.get(0)) {
    elasticsearchSupport
      .scrollContinue(scrollId.get(0), ELKModel.class)
      .map(ELKModelWrapper::valueFrom)
      .subscribe(
          wrapper -> {
            currentSize.add(0, currentSize.get(0) + wrapper.getCurrentSize());
            result.addAll(wrapper.getResults());
            scrollId.add(0, wrapper.getScrollId());
          }).dispose();
          
}

return Flux.fromIterable(result);

}

java reactive-programming spring-data-elasticsearch
1个回答
0
投票

由于您使用

Flux
的反应式方法,因此您不应该使用
while
语句进行阻塞。相反,您应该链接您的反应性操作。

expand
用于执行重复查询,直到收集完所有所需数据,根据总大小和当前大小的比较,仅当仍有数据剩余时才应用scrollContinue操作。

对代码进行了一些修改,但不一定可以运行。

public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to) {

    NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
    sourceBuilder.withQuery(
            QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
    sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
    NativeSearchQuery query = sourceBuilder.build();

    return elasticsearchSupport.scrollStart(query, ELKModel.class)
            .expand(wrapper -> {
                if (wrapper.getTotal() > wrapper.getCurrentSize()) {
                    return elasticsearchSupport.scrollContinue(wrapper.getScrollId(), ELKModel.class)
                            .map(ELKModelWrapper::valueFrom);
                } else {
                    return Flux.empty();
                }
            })
            .flatMap(wrapper -> Flux.fromIterable(wrapper.getResults()));
}
© www.soinside.com 2019 - 2024. All rights reserved.