我尝试了实时数据流项目,并使用 kafka、elasticsearch、kibana、postgres 以及 docker compose 和 flink。
我的数据流是这样的:
kafka -> flink -> elasticsearch 和 postgres。
当我尝试将kafka流数据写入elasticsearch但在kibana开发工具控制台(GET索引/_search或GET索引)上时,我无法找到新数据,直到取消flink作业。
flink 作业启动 -> 在 kibana 上找不到新数据 -> 取消 flink 作业 -> 现在我可以在 kibana 上看到新数据了。
我的部分代码是
DataStream<Transaction> transactionStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
transactionStream.sinkTo(
new Elasticsearch7SinkBuilder<Transaction>()
.setHosts(new HttpHost("localhost", 9200, "http"))
.setEmitter((transaction, runtimeContext, requestIndexer) -> {
String json = convertTransactionToJson(transaction);
IndexRequest indexRequest = Requests.indexRequest()
.index("transactions")
.id(transaction.getTransactionId())
.source(json, XContentType.JSON);
requestIndexer.add(indexRequest);
})
.build()
).name("Elasticsearch Sink");
Postgres 数据库更新没问题。
我使用Mac并且
Java版本:11
弗林克:1.18.0
flink 连接器 kafka:3.0.1-1.18
flink sql连接器elasticsearch7:3.0.1-1.17
我尝试过的:
但是出现另一个错误
无法解析 Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 200 OK} 的响应正文
我的代码与这个存储库完全相同
https://github.com/airscholar/FlinkCommerce
https://www.youtube.com/watch?v=deepQRXnniM
所以我克隆了这个,尝试执行,但发生了同样的问题。 在他的 YouTube 上,这个问题不会发生。
为此我能做什么?
提供某种事务保证的 Flink 接收器将这些事务作为检查点的一部分提交。 Elasticsearch 接收器连接器需要启用检查点。