我们正在使用Spring Cloud Stream for Kafka并寻找Exactly Once Semantics。我们有一个解决方案,正常工作正常1)从生产者启用Idempotent和交易2)使用MetaDataStore检查来自消费者方的重复消息与密钥(offsetId + partitionId + topicName)使用上述解决方案,我们没有任何消息丢失没有重复的处理
但现在我们发现有一个属性(producer.sendOffsetsToTransaction
)Kafka API帮助我们从消费者端修复重复处理而没有任何元数据库逻辑。现在我不知道我们怎么能用这个属性.sendOffsetsToTransaction
的春云流做到这一点
如果将KafkaTransactionManager
添加到应用程序上下文,它将由框架自动处理。
您必须在配置中添加事务ID前缀。
spring.kafka.producer.transaction-id-prefix
和Boot将自动添加一个事务管理器。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
启用活页夹中的事务。请参阅kafka文档中的transaction.id和spring-kafka文档中的Transactions。启用事务时,将忽略各个生产者属性,并且所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer。*属性。
当侦听器正常退出时,侦听器容器在提交事务之前将偏移量发送到事务。