我们正在为Kafka使用Spring Cloud Stream,我们正在寻找具有消费者API的Exactly Once Semantics

问题描述 投票:0回答:1

我们正在使用Spring Cloud Stream for Kafka并寻找Exactly Once Semantics。我们有一个解决方案,正常工作正常1)从生产者启用Idempotent和交易2)使用MetaDataStore检查来自消费者方的重复消息与密钥(offsetId + partitionId + topicName)使用上述解决方案,我们没有任何消息丢失没有重复的处理

但现在我们发现有一个属性(producer.sendOffsetsToTransaction)Kafka API帮助我们从消费者端修复重复处理而没有任何元数据库逻辑。现在我不知道我们怎么能用这个属性.sendOffsetsToTransaction的春云流做到这一点

kafka-consumer-api kafka-producer-api spring-cloud-stream
1个回答
0
投票

如果将KafkaTransactionManager添加到应用程序上下文,它将由框架自动处理。

您必须在配置中添加事务ID前缀。

spring.kafka.producer.transaction-id-prefix

和Boot将自动添加一个事务管理器。

producer properties

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

启用活页夹中的事务。请参阅kafka文档中的transaction.id和spring-kafka文档中的Transactions。启用事务时,将忽略各个生产者属性,并且所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer。*属性。

当侦听器正常退出时,侦听器容器在提交事务之前将偏移量发送到事务。

© www.soinside.com 2019 - 2024. All rights reserved.