我是 Flink 新手,有 2 个 Flink 作业的用例,第一个使用 Topic1 中的数据并在数据库中插入/更新,第二个获取数据库中插入/更新到不同服务使用的 Topic2 的任何新数据。
方法一:
env.fromSource(axon.source(Constants.CONSUMER_TOPIC_NAME, Constants.CONSUMER_GROUP_ID),
WatermarkStrategy.noWatermarks(), "foo-kafka-source").map(axonMessage ->
(FooModel) axonMessage.getPayload())
.addSink(jdbc.exactlyOnceSink(new FooEntityJdbcSink()))
.uid("foo-kafka-source");
方法2
DataStream<AxonMessage> stream = jdbc.source(env, new FooJdbcSource())
.map(x -> {
AxonMessage message = new AxonMessage(x);
message.setTopic(Constants.PRODUCER_TOPIC_NAME);
message.setKey(FooModel.class);
return message;
});
stream.sinkTo(axon.sink())
.name("foo-kafka-sink")
.uid("foo-kafka-sink");
在FooJdbcSource中,用于获取数据的SQL查询是
SELECT {colm_1, colm_2, ...} FROM foo_table;
问题是作业 2 只是一次性挑选数据并推送到主题。作业运行时添加的任何新记录都不会被选取。我发现很少有像 JdbcDynamicTableSource 或 SourceFunction(已弃用)这样的解决方案,它们会以一定的时间间隔(窗口)检查数据库表并根据时间戳获取新数据;但该表预计不会频繁更新,我们需要在运行时立即对主题进行任何更改,而无需定期轮询/监视数据库,因为这会导致太多的数据库操作/点击。 Flink 有什么办法可以动态监控 DB 表吗?
任何帮助将不胜感激。
谢谢!!!
根据您使用的数据库类型,有许多与 Kafka 相关的连接器,它们将充当从数据库到 Kafka 主题的变更数据捕获 (CDC) 流,然后可以通过 Flink 作业监听该主题以流式传输发生的更改/更新。
有很多不同的受支持的 Kafka 连接器,具体取决于您喜欢的独立于 Flink 的数据库风格,例如 Debezium,它支持各种关系和非关系数据库,包括:
Flink 还提供了一个 Flink CDC 项目,该项目也支持通过 Flink 的其他连接器来执行相同的行为。此外,其他几种技术也提供了自己的官方 CDC 连接器来支持此行为。