我是 Flink 新手,有一个用例来使用 Topic1 中的数据并在数据库中插入/更新,并将相同的数据推送到将由不同服务使用的 Topic2。我现在的代码如下所示:
DataStream<AxonMessage> stream =
env.fromSource(axon.source(Constants.CONSUMER_TOPIC_NAME, Constants.CONSUMER_GROUP_ID),
WatermarkStrategy.noWatermarks(), "foo-kafka-source")
.map(axonMessage -> (FooModel) axonMessage.getPayload());
stream.addSink(jdbc.exactlyOnceSink(new FooJdbcSink()))
.name("data-db-sink")
.uid("data-db-sink");
stream.sinkTo(kafka.exactlyOnceSink(fooSchema))
.name("data-kafka-sink")
.uid("data-kafka-sink");
要求是将两个接收器操作作为一个事务完成,即如果数据未插入到数据库中,则也不应将其推送到主题2(如果出现任何错误)。 上面的示例代码是否足够(我也参考了这篇文章),或者如果不够如何在 Flink 中管理事务。
任何帮助将不胜感激。
谢谢!!!
答案取决于您的数据库,因为 Flink 的某些数据库连接器不支持 Exactly Once 语义。
在您的示例中,您的数据库是 JDBC,根据 Flink 的文档,它对恰好一次有一些支持,您可以阅读here。
对于Kafka,你需要正确配置你的KafkaSink,并设置
DeliveryGuarantee.EXACTLY_ONCE
作为保证,这里是一个例子:
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
您可以在此处阅读更多相关信息。
除此之外,Flink 通过仅在检查点完成后才提交 Kafka 或数据库的事务,实现了 Exactly Once 语义。因此,如果由于某种原因失败,它将不会提交事务并从之前成功完成的检查点开始。这样,它可以保证您不会两次生成相同的消息。