elasticsearch、kibana 上不显示实时数据

问题描述 投票:0回答:1

我尝试了实时数据流项目,并使用 kafka、elasticsearch、kibana、postgres 以及 docker compose 和 flink。
我的数据流是这样的:
kafka -> flink -> elasticsearch 和 postgres。

当我尝试将kafka流数据写入elasticsearch但在kibana开发工具控制台(GET索引/_search或GET索引)上时,我无法找到新数据,直到取消flink作业。

flink 作业启动 -> 在 kibana 上找不到新数据 -> 取消 flink 作业 -> 现在我可以在 kibana 上看到新数据了。

我的部分代码是

DataStream<Transaction> transactionStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");

        transactionStream.sinkTo(
                new Elasticsearch7SinkBuilder<Transaction>()
                        .setHosts(new HttpHost("localhost", 9200, "http"))
                        .setEmitter((transaction, runtimeContext, requestIndexer) -> {
                            String json = convertTransactionToJson(transaction);

                            IndexRequest indexRequest = Requests.indexRequest()
                                    .index("transactions")
                                    .id(transaction.getTransactionId())
                                    .source(json, XContentType.JSON);
                            requestIndexer.add(indexRequest);
                        })
                        .build()
        ).name("Elasticsearch Sink");

Postgres 数据库更新没问题。

我使用Mac并且
Java版本:11
弗林克:1.18.0
flink 连接器 kafka:3.0.1-1.18
flink sql连接器elasticsearch7:3.0.1-1.17

我尝试过的:

  1. 附加 setBulkFlushInterval(30000) 选项 因为我发现了这个日志 警告 org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter - 在 Elasticsearch 确认所有记录之前,Writer 已关闭。

但是出现另一个错误
无法解析 Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 200 OK} 的响应正文

  1. 克隆原始代码库

我的代码与这个存储库完全相同 https://github.com/airscholar/FlinkCommerce
https://www.youtube.com/watch?v=deepQRXnniM

所以我克隆了这个,尝试执行,但发生了同样的问题。 在他的 YouTube 上,这个问题不会发生。

为此我能做什么?

elasticsearch apache-kafka apache-flink kibana
1个回答
0
投票

提供某种事务保证的 Flink 接收器将这些事务作为检查点的一部分提交。 Elasticsearch 接收器连接器需要启用检查点。

© www.soinside.com 2019 - 2024. All rights reserved.