当从多个源插入源主题中的记录时,Kafka源连接器不会按预期拉动记录

问题描述 投票:0回答:1

在我的一个用例中,我正在尝试创建一个管道

每当我从自定义分区发送消息时,我都会使用LONG数据类型以毫秒为单位发送时间戳,因为在架构中,timestamp列已定义为long。

我之前在自定义分区中的代码:

Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

在我发送记录之前显示结果:

date = Tue Mar 26 22:02:04 EDT 2019,time in millis = 15536​​52124063

在table2中的timestamp列中插入的值:

3/27/2019 2:02:04.063000 AM

由于它采用英国时区(我相信),我暂时修复了从当前时间戳减去4小时的时间,以便我可以匹配美国EST时间戳。

Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

显示结果:

date = Tue Mar 26 22:04:43 EDT 2019,time in millis = 15536​​37883826

在table2中的timestamp列中插入的值:

3/26/2019 10:04:43.826000 PM

如果我错过任何东西,请告诉我,因为当我从自定义分区发送消息时,我不确定为什么会发生这种情况。

apache-kafka kafka-producer-api apache-kafka-connect
1个回答
1
投票

在引擎盖下Jdbc Source Connector使用以下查询:

SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
    (someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
    OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC

总结:如果查询的时间戳列的值早于当前时间戳并且晚于上次检查,则查询将检索行。

以上参数是:

  1. beginTimetampValue - 上次导入记录的timestamp列的值
  2. endTimetampValue - 根据数据库的当前时间戳
  3. lastIncrementedValue - 上次导入记录的增量列的值

我认为在你的情况下,Producer将带有更高时间戳的Tables记录放入比以后手动插入(使用查询)。

当Jdbc Connector检查要导入Kafka的新记录时,它会跳过它们(因为它们没有满足someTimestampColumn < $endTimetampValue时间戳条件)

您还可以将日志级别更改为DEBUG,并查看日志中发生的情况

© www.soinside.com 2019 - 2024. All rights reserved.