批量索引/使用python的elasticsearch创建文档

问题描述 投票:0回答:1

我正在使用 python 生成大量具有随机内容的 elasticsearch 文档,并使用 elasticsearch-py 对它们进行索引。

简化的工作示例(只有一个字段的文档):

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    es_client.index(index='my_index', document=document)

由于这会对每个文档发出一个请求,因此我尝试通过使用

_bulk
API 发送 1000 个文档块来加快速度。然而,到目前为止我的尝试都没有成功。

我对文档的理解是,你可以将一个迭代传递给

bulk()
,所以我尝试了:

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    document_list.append(document)
    if i % 1000 == 0:
        es_client.bulk(operations=document_list, index='my_index')
        document_list = []

但这会导致

elasticsearch.BadRequestError:BadRequestError(400,'illegal_argument_exception','格式错误的操作/元数据行 [1],应为 START_OBJECT 或 END_OBJECT,但发现为 [VALUE_STRING]')

python json elasticsearch bulkinsert elasticsearch-py
1个回答
8
投票

好吧,看来我混淆了两个不同的功能:

helpers.bulk()
Elasticsearch.bulk()
。两者都可以用来实现我想要做的事情,但它们的签名略有不同。

helpers.bulk()
函数采用
Elasticsearch()
对象和包含文档的可迭代对象作为参数。该操作可以指定为
_op_type
,并且可以是
index
create
delete
update
之一。由于
_op_type
默认为
index
,因此在本例中我们可以省略它并简单地传递文档列表:

from elasticsearch import Elasticsearch, helpers
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(32)}
    document_list.append(document)
    if i % 1000 == 0:
        helpers.bulk(es_client, document_list, index='my_index')
        document_list = []

这个效果很好。

Elasticsearch.bulk()
函数可以替代使用,但是动作/操作是强制性的,作为这里可迭代的一部分,并且语法略有不同。这意味着我们需要一个
dict
来指定操作(在本例中为
dict
)以及每个文档的正文,而不只是包含文档内容的
"index": {}
。另请参阅
_bulk
文档
:

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

actions_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(32)}
    actions_list.append({"index": {}, "doc": document})
    if i % 1000 == 0:
        es_client.bulk(operations=actions_list, index='my_index')
        actions_list = []

这也很好用。

我假设以上两者在内部生成相同的

_bulk
REST API 语句,因此它们最终应该是等效的。


更新

正如 Johan 所指出的,

helpers.bulk()
函数在内部负责分块(它实际上在内部调用
helpers.streaming_bulk()
),因此无需手动为其分配大小为 1000 的操作列表。对于我的最终解决方案,我最终编写了一个生成器函数,无论如何,它一次生成一个文档/操作。然后可以直接将其与您选择的
helpers.streaming_bulk()
一起直接传递给
chunk_size
(默认值为 500):

from elasticsearch import Elasticsearch, helpers                                
from random import getrandbits                                                  
                                                                                
es_client = Elasticsearch('https://elastic.host:9200')
                                                                                
def document_stream():                                                          
   ''' generator function for stream of actions '''                      
   for i in range(1,10000000):                                                  
       yield {'_index': 'my_index',                                   
              '_source': {'my_field': getrandbits(32)} }                                                                                                                                                  
                                                                                
for status_ok, response in helpers.streaming_bulk(es_client, actions=document_stream(), chunk_size=1000):
    if not status_ok:                                                           
        # if failure inserting, log response                                                  
        print(response)     
© www.soinside.com 2019 - 2024. All rights reserved.