我需要更新现有数据。有没有比这更好的方法检索旧数据->修改旧数据->删除旧索引->创建新索引->批量插入新数据这似乎有点愚蠢。此外,我最终有大约。每个store.size
的index
高2倍。我不知道为什么会这样。
直接批量插入修改后的数据不起作用:docs.count
加倍。
有什么想法吗?
更新
这是我的批量插入内容:
var dataPointsBulkIndexOperationsPerBatchId = data.Select(
item => new BulkIndexOperation<T>(item)
{
Index = indexName
});
var allBulksRequest = new BulkRequest
{
Operations = new BulkOperationsCollection<IBulkOperation>(dataPointsBulkIndexOperationsPerBatchId),
Refresh = Refresh.True
};
if (allBulksRequest.Operations.Any())
{
var bulkResponse = elasticClient.Bulk(allBulksRequest);
bulkResponse.AssertResponseIsValidAndSuccessful();
}
为了在一个请求中更新许多文档,基本上有两个选择:
使用bulk API and sending a batch of update operations。每个更新操作都提供与更新API相同的选项,因此可以执行部分更新,脚本更新等。
脚本更新示例
var client = new ElasticClient();
var updates = new[] {
new { Id = 1, Counter = 3 },
new { Id = 2, Counter = 6 },
new { Id = 3, Counter = 5 },
new { Id = 4, Counter = 4 },
};
var bulkResponse = client.Bulk(b => b
.Index("my_index")
.UpdateMany(updates, (descriptor, update) => descriptor
.Id(update.Id)
.Script(s => s
.Source("ctx._source.counter += params.counter")
.Params(p => p
.Add("counter", update.Counter)
)
)
)
);
发送以下请求的
POST http://localhost:9200/my_index/_bulk
{"update":{"_id":1}}
{"script":{"source":"ctx._source.counter += params.counter","params":{"counter":3}}}
{"update":{"_id":2}}
{"script":{"source":"ctx._source.counter += params.counter","params":{"counter":6}}}
{"update":{"_id":3}}
{"script":{"source":"ctx._source.counter += params.counter","params":{"counter":5}}}
{"update":{"_id":4}}
{"script":{"source":"ctx._source.counter += params.counter","params":{"counter":4}}}
使用脚本更新,您可以通过_source
访问脚本中的ctx._source
文档,因此本示例在更新操作中将源文档的counter
字段增加counter
参数的值。 The default scripting language is called Painless,脚本可以根据需要复杂。建议如上所述对内联脚本进行参数化,以允许因编译脚本而产生的编译单元由Elasticsearch缓存和重用。
对于批量更新,您需要知道要更新的文档的ID,以便形成批量更新操作。
update by query API允许您对与查询匹配的一组文档执行脚本化更新。
脚本化更新对每个匹配的文档执行相同的脚本。在执行脚本式更新时,按查询更新和批量更新之间的关键区别在于,每次文档更新不能使用不同的参数值来参数化按查询更新;所有更新都执行相同的脚本更新。
通过查询更新的示例
var updateByQueryResponse = client.UpdateByQuery<object>(b => b
.Index("my_index")
.Query(q => q
.Ids(ids => ids
.Values(1,2,3,4)
)
)
.Script(s => s
.Source("ctx._source.counter += params.counter")
.Params(p => p
.Add("counter", 1)
)
)
);
发送以下请求的
POST http://localhost:9200/my_index/_update_by_query?pretty=true
{
"query": {
"ids": {
"values": [1, 2, 3, 4]
}
},
"script": {
"source": "ctx._source.counter += params.counter",
"params": {
"counter": 1
}
}
}
类似于脚本批量更新,您可以通过_source
访问脚本中的ctx._source
文档。
通过查询进行更新,您无需知道要更新的文档的ID;通过匹配所提供的查询(可以是用于更新所有文档的match_all
查询)来定位要更新的文档。