我正在使用 python 生成大量具有随机内容的 elasticsearch 文档,并使用 elasticsearch-py 对它们进行索引。
简化的工作示例(只有一个字段的文档):
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
es_client.index(index='my_index', document=document)
由于这会对每个文档发出一个请求,因此我尝试通过使用
_bulk
API 发送 1000 个文档块来加快速度。然而,到目前为止我的尝试都没有成功。
我对文档的理解是,你可以将一个迭代传递给
bulk()
,所以我尝试了:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
es_client.bulk(operations=document_list, index='my_index')
document_list = []
但这会导致
elasticsearch.BadRequestError:BadRequestError(400,'illegal_argument_exception','格式错误的操作/元数据行 [1],应为 START_OBJECT 或 END_OBJECT,但发现为 [VALUE_STRING]')
helpers.bulk()
和Elasticsearch.bulk()
。两者都可以用来实现我想要做的事情,但它们的签名略有不同。
helpers.bulk()
函数采用 Elasticsearch()
对象和包含文档的可迭代对象作为参数。该操作可以指定为 _op_type
,并且可以是 index
、create
、delete
或 update
之一。由于 _op_type
默认为 index
,因此在本例中我们可以省略它并简单地传递文档列表:
from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(32)}
document_list.append(document)
if i % 1000 == 0:
helpers.bulk(es_client, document_list, index='my_index')
document_list = []
这个效果很好。
Elasticsearch.bulk()
函数可以替代使用,但是动作/操作是强制性的,作为这里可迭代的一部分,并且语法略有不同。这意味着我们需要一个 dict
来指定操作(在本例中为 dict
)以及每个文档的正文,而不只是包含文档内容的 "index": {}
。另请参阅 _bulk
文档:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
actions_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(32)}
actions_list.append({"index": {}, "doc": document})
if i % 1000 == 0:
es_client.bulk(operations=actions_list, index='my_index')
actions_list = []
这也很好用。
我假设以上两者在内部生成相同的
_bulk
REST API 语句,因此它们最终应该是等效的。
更新:
正如 Johan 所指出的,
helpers.bulk()
函数在内部负责分块(它实际上在内部调用 helpers.streaming_bulk()
),因此无需手动为其分配大小为 1000 的操作列表。对于我的最终解决方案,我最终编写了一个生成器函数,无论如何,它一次生成一个文档/操作。然后可以直接将其与您选择的 helpers.streaming_bulk()
一起直接传递给 chunk_size
(默认值为 500):
from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
def document_stream():
''' generator function for stream of actions '''
for i in range(1,10000000):
yield {'_index': 'my_index',
'_source': {'my_field': getrandbits(32)} }
for status_ok, response in helpers.streaming_bulk(es_client, actions=document_stream(), chunk_size=1000):
if not status_ok:
# if failure inserting, log response
print(response)