使用 Flink 从数据库动态获取新记录并发布到 kafka/axon 主题

问题描述 投票:0回答:1

我是 Flink 新手,有 2 个 Flink 作业的用例,第一个使用 Topic1 中的数据并在数据库中插入/更新,第二个获取数据库中插入/更新到不同服务使用的 Topic2 的任何新数据。

方法一:

    env.fromSource(axon.source(Constants.CONSUMER_TOPIC_NAME, Constants.CONSUMER_GROUP_ID),
                    WatermarkStrategy.noWatermarks(), "foo-kafka-source").map(axonMessage ->
                    (FooModel) axonMessage.getPayload())
            .addSink(jdbc.exactlyOnceSink(new FooEntityJdbcSink()))
            .uid("foo-kafka-source");

方法2

  DataStream<AxonMessage> stream = jdbc.source(env, new FooJdbcSource())
            .map(x -> {
                AxonMessage message = new AxonMessage(x);
                message.setTopic(Constants.PRODUCER_TOPIC_NAME);
                message.setKey(FooModel.class);
                return message;
            });

    stream.sinkTo(axon.sink()) 
            .name("foo-kafka-sink")
            .uid("foo-kafka-sink");

在FooJdbcSource中,用于获取数据的SQL查询是

SELECT {colm_1, colm_2, ...} FROM foo_table;

问题是作业 2 只是一次性挑选数据并推送到主题。作业运行时添加的任何新记录都不会被选取。我发现很少有像 JdbcDynamicTableSource 或 SourceFunction(已弃用)这样的解决方案,它们会以一定的时间间隔(窗口)检查数据库表并根据时间戳获取新数据;但该表预计不会频繁更新,我们需要在运行时立即对主题进行任何更改,而无需定期轮询/监视数据库,因为这会导致太多的数据库操作/点击。 Flink 有什么办法可以动态监控 DB 表吗?

任何帮助将不胜感激。

谢谢!!!

apache-flink flink-streaming
1个回答
0
投票

根据您使用的数据库类型,有许多与 Kafka 相关的连接器,它们将充当从数据库到 Kafka 主题的变更数据捕获 (CDC) 流,然后可以通过 Flink 作业监听该主题以流式传输发生的更改/更新。

有很多不同的受支持的 Kafka 连接器,具体取决于您喜欢的独立于 Flink 的数据库风格,例如 Debezium,它支持各种关系和非关系数据库,包括:

  • MongoDB
  • MySQL
  • PostgreSQL
  • SQL 服务器
  • 甲骨文
  • Db2
  • 卡桑德拉
  • Vitess(孵化中)
  • 扳手
  • JDBC(孵化中)
  • Informix(孵化中)

Flink 还提供了一个 Flink CDC 项目,该项目也支持通过 Flink 的其他连接器来执行相同的行为。此外,其他几种技术也提供了自己的官方 CDC 连接器来支持此行为。

© www.soinside.com 2019 - 2024. All rights reserved.