我正在尝试使用Kafka的Connect API中的SMT索引Elasticsearch中的消息。
到目前为止,我只是简单地使用主题和时间戳路由器功能。但是,现在我想基于消息中的某个字段创建单独的索引。
假设消息的格式如下:
{"productId": 1, "category": "boat", "price": 135000}
{"productId": 1, "category": "helicopter", "price": 300000}
{"productId": 1, "category": "car", "price": 25000}
是否可以根据产品类别将这些指数编入以下指数?
或者我是否必须为每个类别创建单独的主题(知道它可能成为数百或数千个)?
我是否正在监督可以执行此操作的转换,或者这根本不可能并且是否必须构建自定义组件?
Kafka Connect没有开箱即用的功能。你有几个选择:
CREATE STREAM product-boat AS SELECT * FROM messages WHERE category='boat'
等)
Kafka Streams现在拥有动态路由(KIP-303),这将是一种更灵活的方式如果您使用的是Confluent Platform
,您可以根据消息中的字段值进行某种路由。
要做到这一点,你必须使用汇合的ExtractTopic
SMT。有关该SMT的更多细节可以在https://docs.confluent.io/current/connect/transforms/extracttopic.html#extracttopic找到
Kafka Sink Connector处理由SinkRecord
表示的消息。每个SinkRecord
包含几个字段:topic
,partition
,value
,key
等。这些字段由Kafka Connect设置并使用转换您可以更改这些值。 ExtractTopic
SMT根据消息的topic
或value
更改key
的值。
转换配置将是这样的:
{
...
"transforms": "ExtractTopic",
"transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.ExtractTopic.field": "name", <-- name of field, that value will be used as index name
...
}
一个限制是,您必须提前创建索引。
我怎么假设你使用Elasticsearch Sink Connector。 Elasticsearch连接器具有创建索引的能力,但它在打开时执行 - 为特定分区(ElasticsearchSinkTask::open
)创建编写器的方法。在您的用例当时,无法创建所有索引,因为所有消息的值都不可用。
也许这不是最纯粹的方法,因为ExtractTopic
应该用于Source连接器,但在你的情况下它可能会起作用。