kafka连接器中的动态主题

问题描述 投票:1回答:2

kafka添加了在连接器中使用正则表达式的新功能,但是在重新启动连接器之前,似乎不会消耗连接器启动后新添加的主题中的主题数据。我们需要动态添加新主题,并让连接器根据连接器属性中定义的正则表达式使用主题。如何实现?例如:正则表达式:主题 - 。*主题:topic-1,topic-2如果我介绍新主题topic-3,那么如何在不重新启动主题数据的情况下使连接器使用主题数据?

apache-kafka apache-kafka-connect confluent
2个回答
0
投票

根据其他人已经在评论中提出的想法,基本上你需要做的是建立一个机制,确定引入了一个新主题,并且需要干净地重新启动连接器。

我会做这样的事情,

1>在已连接的主题(例如topic-1)中发送特定类型的消息,如果收到此类消息,则代码应保存所有新的msg轮询并等待所有偏移提交完成。

2>然后从轮询循环中断并从您的使用者中删除订阅(consumer.unsubscribe())。

3>之后通常需要遵循正则表达式主题订阅的流程,因为新主题现在将成为正则表达式的一部分。

请记住提交很重要,如果你急速重启连接器,你可能会得到重复。还有一点显然不要改变group.id并将auto.offset.reset保持为'latest'。


0
投票

Kafka消费者有一个选项metadata.max.age.ms - 消费者刷新主题元数据的时间间隔。如果你不需要真正的实时,这可能会有所帮助。另见:kafka consumer to dynamically detect topics added

/etc/kafka-connect/kafka-connect.properties你应该指定consumer.metadata.max.age.ms=1000 1秒钟。

© www.soinside.com 2019 - 2024. All rights reserved.