Kafka使用(json)消息中的字段连接到Elasticsearch索引的主题

问题描述 投票:0回答:2

我正在尝试使用Kafka的Connect API中的SMT索引Elasticsearch中的消息。

到目前为止,我只是简单地使用主题和时间戳路由器功能。但是,现在我想基于消息中的某个字段创建单独的索引。

假设消息的格式如下:

{"productId": 1, "category": "boat", "price": 135000}
{"productId": 1, "category": "helicopter", "price": 300000}
{"productId": 1, "category": "car", "price": 25000}

是否可以根据产品类别将这些指数编入以下指数?

  • 产品船
  • 产品直升机
  • 产品车

或者我是否必须为每个类别创建单独的主题(知道它可能成为数百或数千个)?

我是否正在监督可以执行此操作的转换,或者这根本不可能并且是否必须构建自定义组件?

elasticsearch apache-kafka apache-kafka-connect confluent
2个回答
0
投票

Kafka Connect没有开箱即用的功能。你有几个选择:

  1. Elasticsearch接收器连接器将根据其主题将消息路由到目标索引,因此您可以编写一个自定义SMT来检查消息并将其路由到相应的主题
  2. 使用流处理器预处理消息,使得它们在Elasticsearch接收器连接器消耗时已经在不同的主题上。例如,Kafka Streams或KSQL。 KSQL你需要硬编码每个类别(CREATE STREAM product-boat AS SELECT * FROM messages WHERE category='boat'等) Kafka Streams现在拥有动态路由(KIP-303),这将是一种更灵活的方式
  3. 使用编码的逻辑手动编码定制的Elasticsearch接收器连接器,以根据消息内容将消息路由到索引。这感觉就像IMO三种方法中最糟糕的一样。

0
投票

如果您使用的是Confluent Platform,您可以根据消息中的字段值进行某种路由。

要做到这一点,你必须使用汇合的ExtractTopic SMT。有关该SMT的更多细节可以在https://docs.confluent.io/current/connect/transforms/extracttopic.html#extracttopic找到

Kafka Sink Connector处理由SinkRecord表示的消息。每个SinkRecord包含几个字段:topicpartitionvaluekey等。这些字段由Kafka Connect设置并使用转换您可以更改这些值。 ExtractTopic SMT根据消息的topicvalue更改key的值。

转换配置将是这样的:

{
...
    "transforms": "ExtractTopic",
    "transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
    "transforms.ExtractTopic.field": "name",  <-- name of field, that value will be used as index name
...
}

一个限制是,您必须提前创建索引。

我怎么假设你使用Elasticsearch Sink Connector。 Elasticsearch连接器具有创建索引的能力,但它在打开时执行 - 为特定分区(ElasticsearchSinkTask::open)创建编写器的方法。在您的用例当时,无法创建所有索引,因为所有消息的值都不可用。

也许这不是最纯粹的方法,因为ExtractTopic应该用于Source连接器,但在你的情况下它可能会起作用。

© www.soinside.com 2019 - 2024. All rights reserved.