我在ES中有这样的数据:
{"action":"B1", "status":"E", "name": "abc", "version":0, "colour": "red"}
{"action":"B1", "status":"E", "name": "def", "version":1, "colour": "red"}
{"action":"B1", "status":"E", "name": "was", "version":2, "colour": "red"}
{"action":"B2", "status":"V", "name": "acc", "version":0, "colour": "red"}
{"action":"B3", "status":"E", "name": "fff", "version":0, "colour": "red"}
{"action":"B3", "status":"V", "name": "ttt", "version":1, "colour": "red"}
{"action":"B4", "status":"V", "name": "ttt", "version":1, "colour": "blue"}
要求是:
对于所有红色项目,我需要按操作名称和每个操作名称组进行一些分组:
正确的结果是(按名称 ASC 排序):
{"action":"B2", "status":"V", "name": "acc", "version"=0} //this was returned because all actions with status V should be returned
{"action":"B3", "status":"E", "name": "fff", "version"=0} //there were two actions in B3 group, it returned the one with highest version and E state
{"action":"B3", "status":"V", "name": "ttt", "version"=1} //there were two actions in B3 group, it returned the one with V state, because they should be always added to results
{"action":"B1", "status":"E", "name": "was", "version"=2} //there were many actions in B1 group so it returned the one with highest version
我想搜索文档。我的查询如下所示:
GET myIndex/_search
{
"from": 0,
"size": 10,
"query": {
"bool": {
"filter": [
{
"query_string": {
"query": "red",
"fields": [
"colour"
]
}
}
],
"adjust_pure_negative": true,
"boost": 1.0
}
} ,
"version": true,
"explain": false,
"aggregations": {
"all_items": {
"terms": {
"field": "action",
"size": 10, //I AM NOT SURE WHAT THIS SIZE SPECIFIES ? I want first page of 10 elements
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"bbb": {
"top_hits": {
"from": 0,
"size": 1,
"version": false,
"seq_no_primary_term": false,
"explain": false,
"fields": [
{
"field": "version"
}
]
}
}
}
}
}
}
我目前的结果是错误的。问题:
1)
"hits" : {
"total" : {
"value" : 6,
"relation" : "eq"
}
hits.hits
TotalHits=6 大于存储桶中的文档数量 (3),并且我无法构建分页信息(在我的分页器中,我应该看到元素总数应为 4)
{"action":"B1", "status":"E", "name": "abc", "version"=0, "colour" = red}
{"action":"B1", "status":"E", "name": "def", "version"=1, "colour" = red}
{"action":"B1", "status":"E", "name": "was", "version"=2, "colour" = red}
{"action":"B2", "status":"V", "name": "acc", "version"=0, "colour" = red}
{"action":"B3", "status":"E", "name": "fff", "version"=0, "colour" = red}
{"action":"B3", "status":"V", "name": "ttt", "version"=1, "colour" = red}
2) 在结果存储桶中,我有按状态分组的文档:
带钥匙 B1 的桶 //OK
{"action":"B1", "status":"E", "name": "was", "version"=2, "colour" = red}
带钥匙 B2 的桶 //OK
{"action":"B2", "status":"V", "name": "acc", "version"=0, "colour" = red}
带钥匙 B3 的桶 //错误
{"action":"B3", "status":"V", "name": "ttt", "version"=1, "colour" = red}
所以我缺少这份文件:
{"action":"B3", "status":"E", "name": "fff", "version"=0, "colour" = red}
因为聚合从这个存储桶中获取了最高版本的文档。但根据要求,我应该采用 V 状态的所有元素和 E 状态的最后一项(就版本而言)。
如有任何帮助,我们将不胜感激。
对于丢失状态为V的元素,我有一些想法,例如引入新的分组字段(这样我就不会按操作字段分组,而是按newGroupingDiscriminatorField分组)并分配给此类字段尊重值:
我不知道正确的totalCount(我应该向用户展示他正在观看其中120个元素中的1-10个元素 -> 120意味着120个元素准备好显示,已经与消除的不必要的项目分组,而不是在分组之前)
我不知道查询的哪一部分负责从所有分组的元素中仅获取第一页。第一页有 10 个元素。但为了只获得第一页,我需要首先对所有文档进行分组..?
可以通过
scripted_metric
解决
您的文件
PUT /complex_selection/_bulk
{"create":{"_id":1}}
{"action":"B1", "status":"E", "name": "was", "version": 2, "colour": "red"}
{"create":{"_id":2}}
{"action":"B1", "status":"E", "name": "abc", "version": 0, "colour": "red"}
{"create":{"_id":3}}
{"action":"B1", "status":"E", "name": "def", "version": 1, "colour": "red"}
{"create":{"_id":4}}
{"action":"B2", "status":"V", "name": "acc", "version": 0, "colour": "red"}
{"create":{"_id":5}}
{"action":"B3", "status":"E", "name": "fff", "version": 0, "colour": "red"}
{"create":{"_id":6}}
{"action":"B3", "status":"V", "name": "ttt", "version": 1, "colour": "red"}
{"create":{"_id":7}}
{"action":"B4", "status":"V", "name": "ttt", "version": 1, "colour": "blue"}
脚本化指标查询
POST /complex_selection/_search?filter_path=aggregations
{
"query": {
"term": {
"colour": {
"value": "red"
}
}
},
"aggs": {
"selected_documents": {
"scripted_metric": {
"init_script": "state.actions = [:];",
"map_script": """
def action = params['_source']['action'];
def status = params['_source']['status'];
def document = params['_source'];
if (state.actions[action] == null) {
state.actions[action] = [:];
}
if (state.actions[action][status] == null) {
state.actions[action][status] = [];
}
if (status == 'E') {
if (state.actions[action][status].size() > 0) {
def mapDocumentVersion = state.actions[action][status][0]['version'];
def documentVersion = document['version'];
if (mapDocumentVersion >= documentVersion) {
return;
}
}
state.actions[action][status] = [];
}
state.actions[action][status].add(document);
""",
"combine_script": "return state.actions",
"reduce_script": """
Map selectedDocuments = new HashMap();
for (state in states) {
selectedDocuments.putAll(state);
}
List documents = [];
Set mapActions = selectedDocuments.keySet();
for (mapAction in mapActions) {
Set mapStatuses = selectedDocuments[mapAction].keySet();
for (mapStatus in mapStatuses) {
List value = selectedDocuments[mapAction][mapStatus];
documents.addAll(value);
}
}
return documents;
"""
}
}
}
}
回应
{
"aggregations" : {
"selected_documents" : {
"value" : [
{
"colour" : "red",
"name" : "acc",
"action" : "B2",
"version" : 0,
"status" : "V"
},
{
"colour" : "red",
"name" : "fff",
"action" : "B3",
"version" : 0,
"status" : "E"
},
{
"colour" : "red",
"name" : "ttt",
"action" : "B3",
"version" : 1,
"status" : "V"
},
{
"colour" : "red",
"name" : "was",
"action" : "B1",
"version" : 2,
"status" : "E"
}
]
}
}
}
注意! 此查询适用于单分片索引。 reduce_script 阶段代码已简化,对于多分片索引来说并不正确