我的任务很简单,我需要用反应式方式查询elasticsearch滚动索引。
由于@Document不支持Spring EL的索引名称,如@Document(index = "indexName-#(new Date().format(yyyy-MM-dd))")
所以我尝试使用 ReactiveElasticsearchTemplate 调用 elasticsearch,它支持我在运行时更改索引名称。
但是由于数据量大于10000,所以我需要使用scroll来重复查询,直到获取所有数据。
我已经完成了第一个查询和滚动查询,它可以返回值。
但是我需要合并所有结果然后返回。
我怎样才能做到这一点?目前,当消费者仍在工作时,空结果已返回到前端。我怎样才能要求线程等待,直到消费者完成elasticsearch返回所有数据?谢谢。
public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to)
throws Exception {
List<ELKModel> result = new ArrayList<ELKModel>();
List<Long> total = new ArrayList<>();
List<Long> currentSize = new ArrayList<>();
List<String> scrollId = new ArrayList<>();
NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
elasticsearchSupport
.scrollStart(query, ELKModel.class)
.map(ELKModelWrapper::valueFrom).
subscribe(
wrapper -> {
total.add(wrapper.getTotal());l
currentSize.add(wrapper.getCurrentSize());
result.addAll(wrapper.getResults());
scrollId.add(wrapper.getScrollId());
}).dispose();
while (currentSize.size() == 1 && total.size() == 1 && currentSize.get(0) < total.get(0)) {
elasticsearchSupport
.scrollContinue(scrollId.get(0), ELKModel.class)
.map(ELKModelWrapper::valueFrom)
.subscribe(
wrapper -> {
currentSize.add(0, currentSize.get(0) + wrapper.getCurrentSize());
result.addAll(wrapper.getResults());
scrollId.add(0, wrapper.getScrollId());
}).dispose();
}
return Flux.fromIterable(result);
}
由于您使用
Flux
的反应式方法,因此您不应该使用 while
语句进行阻塞。相反,您应该链接您的反应性操作。
expand
用于执行重复查询,直到收集完所有所需数据,根据总大小和当前大小的比较,仅当仍有数据剩余时才应用scrollContinue操作。
对代码进行了一些修改,但不一定可以运行。
public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to) {
NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
return elasticsearchSupport.scrollStart(query, ELKModel.class)
.expand(wrapper -> {
if (wrapper.getTotal() > wrapper.getCurrentSize()) {
return elasticsearchSupport.scrollContinue(wrapper.getScrollId(), ELKModel.class)
.map(ELKModelWrapper::valueFrom);
} else {
return Flux.empty();
}
})
.flatMap(wrapper -> Flux.fromIterable(wrapper.getResults()));
}