kafka添加了在连接器中使用正则表达式的新功能,但是在重新启动连接器之前,似乎不会消耗连接器启动后新添加的主题中的主题数据。我们需要动态添加新主题,并让连接器根据连接器属性中定义的正则表达式使用主题。如何实现?例如:正则表达式:主题 - 。*主题:topic-1,topic-2如果我介绍新主题topic-3,那么如何在不重新启动主题数据的情况下使连接器使用主题数据?
根据其他人已经在评论中提出的想法,基本上你需要做的是建立一个机制,确定引入了一个新主题,并且需要干净地重新启动连接器。
我会做这样的事情,
1>在已连接的主题(例如topic-1)中发送特定类型的消息,如果收到此类消息,则代码应保存所有新的msg轮询并等待所有偏移提交完成。
2>然后从轮询循环中断并从您的使用者中删除订阅(consumer.unsubscribe())。
3>之后通常需要遵循正则表达式主题订阅的流程,因为新主题现在将成为正则表达式的一部分。
请记住提交很重要,如果你急速重启连接器,你可能会得到重复。还有一点显然不要改变group.id并将auto.offset.reset保持为'latest'。
Kafka消费者有一个选项metadata.max.age.ms
- 消费者刷新主题元数据的时间间隔。如果你不需要真正的实时,这可能会有所帮助。另见:kafka consumer to dynamically detect topics added
在/etc/kafka-connect/kafka-connect.properties
你应该指定consumer.metadata.max.age.ms=1000
1秒钟。