弹性搜索 Python 中的批量索引错误

问题描述 投票:0回答:1

我正在尝试使用批量调用将一个简单的 Hello World 示例摄取到数据流中,如文档中所示https://elasticsearch-py.readthedocs.io/en/v8.6.2/helpers.html#bulk-helpers

Traceback (most recent call last):
  File "C:\Users\elastic\Documents\azure_repos\dataingestion\data-ingestion-test-function\data_stream\hello_world.py", line 117, in <module>
    bulk(client=client, index='test-data-stream', actions=data)
  File "C:\Users\elastic\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 524, in bulk
    for ok, item in streaming_bulk(
  File "C:\Users\elastic\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 438, in streaming_bulk
    for data, (ok, info) in zip(
  File "C:\Users\elastic\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 355, in _process_bulk_chunk
    yield from gen
  File "C:\Users\elastic\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 274, in _process_bulk_chunk_success
    raise BulkIndexError(f"{len(errors)} document(s) failed to index.", errors)
elasticsearch.helpers.BulkIndexError: 2 document(s) failed to index.

我已经尝试使用相同的代码来获取索引并且它有效

这是客户代码

client = Elasticsearch(
                "https://xx.xx.x.xx:9200",
                basic_auth=("username","password"),
                verify_certs=False)

我已经创建了 ilm 策略、索引模板、组件模板,如本教程所示
https://opster.com/guides/elasticsearch/data-architecture/elasticsearch-data-streams/

我在 Kibana 中创建了这个并调用了数据流 test-data-stream 并确认数据流是使用 Kibana UI 成功创建的

我成功地能够使用 postman 使用 api 调用将数据摄取到数据流中,但是我在使用 python 代码摄取时遇到了问题

这就是我要摄取的东西

data = [{"message": "Hello World", "@timestamp": "2023-01-11T11:54:44Z"},
            {"message": "Hello World1", "@timestamp": "2023-01-11T11:54:44Z"}]

我用这个代码来摄取

client.indices.delete_data_stream(name='test-data-stream', error_trace=True)
client.indices.create_data_stream(name='test-data-stream', error_trace=True)

bulk(client=client, index='test-data-stream', actions=data)

在索引参数中,如果我切换到索引,代码工作正常但它不适用于数据流

python elasticsearch indexing kibana bulkinsert
1个回答
0
投票

当索引到 data_stream 时,你必须有

op_type: create
而 bulk 默认有
op_type:  index
,所以你需要在文档中这样指定它:

data = [{"_op_type": "create", "message": "Hello World", "@timestamp": "2023-01-11T11:54:44Z"},
        {"_op_type": "create", "message": "Hello World1", "@timestamp": "2023-01-11T11:54:44Z"}]
© www.soinside.com 2019 - 2024. All rights reserved.