在单个事务中将数据接收到数据库和Kafka/Axon

问题描述 投票:0回答:1

我是 Flink 新手,有一个用例来使用 Topic1 中的数据并在数据库中插入/更新,并将相同的数据推送到将由不同服务使用的 Topic2。我现在的代码如下所示:

DataStream<AxonMessage> stream = 
                env.fromSource(axon.source(Constants.CONSUMER_TOPIC_NAME, Constants.CONSUMER_GROUP_ID),
                    WatermarkStrategy.noWatermarks(), "foo-kafka-source")
                    .map(axonMessage -> (FooModel) axonMessage.getPayload());

stream.addSink(jdbc.exactlyOnceSink(new FooJdbcSink()))
                .name("data-db-sink")
                .uid("data-db-sink");

stream.sinkTo(kafka.exactlyOnceSink(fooSchema))
                .name("data-kafka-sink")
                .uid("data-kafka-sink");

要求是将两个接收器操作作为一个事务完成,即如果数据未插入到数据库中,则也不应将其推送到主题2(如果出现任何错误)。 上面的示例代码是否足够(我也参考了这篇文章),或者如果不够如何在 Flink 中管理事务。

任何帮助将不胜感激。

谢谢!!!

apache-flink flink-streaming
1个回答
0
投票

答案取决于您的数据库,因为 Flink 的某些数据库连接器不支持 Exactly Once 语义。

在您的示例中,您的数据库是 JDBC,根据 Flink 的文档,它对恰好一次有一些支持,您可以阅读here

对于Kafka,你需要正确配置你的KafkaSink,并设置

DeliveryGuarantee.EXACTLY_ONCE
作为保证,这里是一个例子:

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers(brokers)
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("topic-name")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();

您可以在此处阅读更多相关信息。

除此之外,Flink 通过仅在检查点完成后才提交 Kafka 或数据库的事务,实现了 Exactly Once 语义。因此,如果由于某种原因失败,它将不会提交事务并从之前成功完成的检查点开始。这样,它可以保证您不会两次生成相同的消息。

© www.soinside.com 2019 - 2024. All rights reserved.