我正在使用
spring-data-elasticsearch
,我只是想知道是否可以以某种方式将聚合查询作为 json 字符串发送。
所以查询看起来像这样:
{
"size": 0,
"aggs": {
"cspByState": {
"terms": {
"field": "properties.csp.keyword"
},
"aggregations": {
"levelType": {
"terms": {
"field": "level.keyword"
}
}
}
}
},
"query": {
"bool": {
"must": [{
"bool": {
"should": [{
"wildcard": {
"event": {
"value": "input.start.*"
}
}
},
{
"wildcard": {
"event": {
"value": "input.finish.*"
}
}
}
]
}
}
],
"filter": [
{
"range": {
"dateTime": {
"gte": "2021-03-01T00:00Z",
"lte": "2021-03-11T23:59:59.99Z",
"format": "strict_date_optional_time"
}
}
}
]
}
}
}
我尝试执行以下操作,但它总是在未知聚合上失败:
final WrapperQueryBuilder wrapper = new WrapperQueryBuilder(queryString);
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(wrapper);
searchSourceBuilder.size(0);
final SearchRequest searchRequest = new SearchRequest("csp-index");
searchRequest.source(searchSourceBuilder);
Flux.from(elasticsearchTemplate.execute(client -> client.aggregate(searchRequest))).collectList().block();
Caused by: ElasticsearchException[Elasticsearch exception [type=named_object_not_found_exception, reason=[2:11] unknown field [aggs]]]
at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:496)
用这些线
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(wrapper);
searchSourceBuilder.size(0);
您设置搜索请求的
query
部分和 size
部分。您不能将整个搜索请求作为查询参数传递。
聚合必须设置为
searchSourceBuilder.aggregation(aggregationBuilder)
我不知道 Elasticsearch 是否提供了解析 JSON 字符串的
AggregationBuilder
,我在快速查看 AggregationBuilders
类时没有看到它。
最后我用下面的代码做到了。主要是将
typed_keys
查询属性发送到 elastic,然后从 DefaultReactiveElasticsearchClient
获取一些方法来进行响应解析。
@Service
@Log4j2
@RequiredArgsConstructor
public class ElasticSearchService {
private final ReactiveElasticsearchOperations elasticsearchTemplate;
private final DfElasticSearchProperties elasticSearchProperties;
private final Mustache.Compiler compiler;
public Flux<Aggregation> transactionStatistics(LocalDate date) {
final Map<String, Object> parameters = new HashMap<>();
parameters.put("dateFrom", date.atStartOfDay(ZoneOffset.UTC));
parameters.put("dateTo", date.atTime(LocalTime.MAX).atZone(ZoneOffset.UTC));
final String queryString = compiler.loadTemplate("transaction-statistics").execute(parameters);
return aggregate(queryString);
}
private Flux<Aggregation> aggregate(String request) {
return Flux.from(elasticsearchTemplate.execute(client -> sendRequest(client, request)));
}
private Flux<Aggregation> sendRequest(ReactiveElasticsearchClient client, String request) {
return client.execute(c -> sendRequest(c, request))
.flatMapMany(c -> c.body(BodyExtractors.toMono(byte[].class))
.map(it -> new String(it, StandardCharsets.UTF_8))
.flatMap(json -> getSearchResponseFromJson(json)))
.flatMap(response -> Flux.fromIterable(response.getAggregations()));
}
private Mono<ClientResponse> sendRequest(WebClient client, String request) {
return client.method(HttpMethod.GET).uri(
builder -> builder.path("/").path(elasticSearchProperties.getCspEventsIndex()).path("/_search").queryParam(
"typed_keys").build()).contentType(MediaType.APPLICATION_JSON).bodyValue(request).exchange();
}
private static Mono<SearchResponse> getSearchResponseFromJson(String jsonResponse) {
try {
XContentParser parser = createParser(jsonResponse);
return Mono.just(SearchResponse.fromXContent(parser));
} catch (Exception e) {
return Mono.error(new ElasticsearchException(String.format("Unable to parse response [%s]", jsonResponse), e));
}
}
private static XContentParser createParser(String content) throws IOException {
return XContentType.fromMediaTypeOrFormat(MediaType.APPLICATION_JSON_VALUE) //
.xContent() //
.createParser(new NamedXContentRegistry(getDefaultNamedXContents()),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
}
}
仍然无法使用 Elasticsearch Java API Client 8 以 JSON 形式发送聚合。我完成了在代码中创建聚合并仅使用 JSON 形式的查询:
NativeQuery query = NativeQuery.builder()
// rewrite JSON aggregation into code
.withAggregation("xxx",
new Aggregation.Builder().terms(tb -> tb.field("field"))
.build())
.withQuery(new StringQuery(queryString))
.withMaxResults(0)
.build();
Flux<Aggregation> aggregation = elasticsearchTemplate
.aggregate(query, Object.class, IndexCoordinates.of("index-name"))
.cast(ElasticsearchAggregation.class)
.map(ElasticsearchAggregation::aggregation);
API 有点奇怪,因为几乎在任何地方你都可以使用构建器函数(例如
Function<TermsAggregation.Builder, ObjectBuilder<TermsAggregation>> fn
)。但
withAggregation
只需要Aggregation
而不是Function<Aggregation.Builder, Aggregation.Builder>
,所以你必须打电话给build()
。