我们使用 Kakfa connect 和
JdbcSourceConnector
从 PostgreSQL 中的简单发件箱表中获取事件,然后将它们推送到 Kafka。连接器之前的配置如下:
"mode": "incrementing",
"incrementing.column.name": "id",
"query": "SELECT * FROM outbox",
我们想添加延迟并将配置更改为:
"mode": "timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "created",
"timestamp.delay.interval.ms": 10000,
"timestamp.initial": -1,
"db.timezone": "Europe/Vienna",
现在一切正常,但由于某种原因当我们应用此配置时,我们的整个发件箱被重播了。这不是我们所期望的,因为文档指出
timestamp.initial
是
用于使用时间戳条件的初始查询的纪元时间戳(以毫秒为单位)。使用 -1 来使用当前时间。如果未指定,将检索所有数据。
正如您在上面看到的,我们相应地配置了
timestamp.initial
。
那么如何解释这种行为呢?为什么 kafka-connect 决定重新发送所有消息,即使它们的时间戳肯定早于当前时间?
发生这种情况的原因有多种。
对于您进行更改后连接器的第一次重新启动,它可能会触发重新初始化,并且 timestamp.initial 将设置为 当前时间。这可能会导致连接器重新处理所有数据。
确保db.timezone设置正确,以匹配数据库“已创建”列中使用的时区。如果时区不匹配,时间戳可能会有不同的解释。
检查 PostgreSQL 表的“已创建”列中时间戳数据的精度。如果时间戳值精度较高,则初始时间戳与数据的实际时间戳之间可能存在差异。