elasticsearch java客户端解析聚合json查询

问题描述 投票:0回答:3

我正在使用

spring-data-elasticsearch
,我只是想知道是否可以以某种方式将聚合查询作为 json 字符串发送。

所以查询看起来像这样:

{
  "size": 0,
  "aggs": {
    "cspByState": {
      "terms": {
        "field": "properties.csp.keyword"
      },
      "aggregations": {
        "levelType": {
          "terms": {
            "field": "level.keyword"
          }
        }
      }
    }
  },
  "query": {
    "bool": {
      "must": [{
          "bool": {
            "should": [{
                "wildcard": {
                  "event": {
                    "value": "input.start.*"
                  }
                }
              },
              {
                "wildcard": {
                  "event": {
                    "value": "input.finish.*"
                  }
                }
              }
            ]
          }
        }
      ],
      "filter": [
        {
          "range": {
            "dateTime": {
              "gte": "2021-03-01T00:00Z",
              "lte": "2021-03-11T23:59:59.99Z",
              "format": "strict_date_optional_time"
            }
          }
        }
      ]
    }
  }
}

我尝试执行以下操作,但它总是在未知聚合上失败:

final WrapperQueryBuilder wrapper = new WrapperQueryBuilder(queryString);
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(wrapper);
searchSourceBuilder.size(0);
final SearchRequest searchRequest = new SearchRequest("csp-index");
searchRequest.source(searchSourceBuilder);

Flux.from(elasticsearchTemplate.execute(client -> client.aggregate(searchRequest))).collectList().block();
Caused by: ElasticsearchException[Elasticsearch exception [type=named_object_not_found_exception, reason=[2:11] unknown field [aggs]]]
    at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:496)
elasticsearch spring-data-elasticsearch
3个回答
0
投票

用这些线

final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(wrapper);
searchSourceBuilder.size(0);

您设置搜索请求的

query
部分和
size
部分。您不能将整个搜索请求作为查询参数传递。

聚合必须设置为

searchSourceBuilder.aggregation(aggregationBuilder)

我不知道 Elasticsearch 是否提供了解析 JSON 字符串的

AggregationBuilder
,我在快速查看
AggregationBuilders
类时没有看到它。


0
投票

最后我用下面的代码做到了。主要是将

typed_keys
查询属性发送到 elastic,然后从
DefaultReactiveElasticsearchClient
获取一些方法来进行响应解析。

@Service
@Log4j2
@RequiredArgsConstructor
public class ElasticSearchService {
  
  private final ReactiveElasticsearchOperations elasticsearchTemplate;
  private final DfElasticSearchProperties elasticSearchProperties;
  private final Mustache.Compiler compiler;
  
  public Flux<Aggregation> transactionStatistics(LocalDate date) {
    final Map<String, Object> parameters = new HashMap<>();
    parameters.put("dateFrom", date.atStartOfDay(ZoneOffset.UTC));
    parameters.put("dateTo", date.atTime(LocalTime.MAX).atZone(ZoneOffset.UTC));
    
    final String queryString = compiler.loadTemplate("transaction-statistics").execute(parameters);
    
    return aggregate(queryString);
  }
  
  private Flux<Aggregation> aggregate(String request) {
    return Flux.from(elasticsearchTemplate.execute(client -> sendRequest(client, request)));
  }
  
  private Flux<Aggregation> sendRequest(ReactiveElasticsearchClient client, String request) {
    return client.execute(c -> sendRequest(c, request))
             .flatMapMany(c -> c.body(BodyExtractors.toMono(byte[].class))
                                 .map(it -> new String(it, StandardCharsets.UTF_8))
                                 .flatMap(json -> getSearchResponseFromJson(json)))
             .flatMap(response -> Flux.fromIterable(response.getAggregations()));
  }
  
  private Mono<ClientResponse> sendRequest(WebClient client, String request) {
    return client.method(HttpMethod.GET).uri(
      builder -> builder.path("/").path(elasticSearchProperties.getCspEventsIndex()).path("/_search").queryParam(
        "typed_keys").build()).contentType(MediaType.APPLICATION_JSON).bodyValue(request).exchange();
  }
  
  private static Mono<SearchResponse> getSearchResponseFromJson(String jsonResponse) {
    try {
      XContentParser parser = createParser(jsonResponse);
      return Mono.just(SearchResponse.fromXContent(parser));
    } catch (Exception e) {
      return Mono.error(new ElasticsearchException(String.format("Unable to parse response [%s]", jsonResponse), e));
    }
  }
  
  private static XContentParser createParser(String content) throws IOException {
    return XContentType.fromMediaTypeOrFormat(MediaType.APPLICATION_JSON_VALUE) //
      .xContent() //
      .createParser(new NamedXContentRegistry(getDefaultNamedXContents()),
        DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
  }
}

0
投票

仍然无法使用 Elasticsearch Java API Client 8 以 JSON 形式发送聚合。我完成了在代码中创建聚合并仅使用 JSON 形式的查询:

NativeQuery query = NativeQuery.builder()
        // rewrite JSON aggregation into code
        .withAggregation("xxx",
            new Aggregation.Builder().terms(tb -> tb.field("field"))
                .build())
        .withQuery(new StringQuery(queryString))
        .withMaxResults(0)
        .build();

    Flux<Aggregation> aggregation = elasticsearchTemplate
        .aggregate(query, Object.class, IndexCoordinates.of("index-name"))
        .cast(ElasticsearchAggregation.class)
        .map(ElasticsearchAggregation::aggregation);

API 有点奇怪,因为几乎在任何地方你都可以使用构建器函数(例如

Function<TermsAggregation.Builder, ObjectBuilder<TermsAggregation>> fn
)。但
withAggregation
只需要
Aggregation
而不是
Function<Aggregation.Builder, Aggregation.Builder>
,所以你必须打电话给
build()

© www.soinside.com 2019 - 2024. All rights reserved.