在 https://docs.spring.io/spring-kafka/reference/html/#overview-2 上声明如下:
从版本 2.5.8 开始,您现在可以在生产者工厂上配置 maxAge 属性。当使用可能为代理的 transactional.id.expiration.ms 闲置的事务生产者时,这非常有用。对于当前的 kafka-clients,这可能会导致 ProducerFencedException 且无需重新平衡。通过将 maxAge 设置为小于 transactional.id.expiration.ms,如果生产者超过了其最大年龄,工厂将刷新生产者。
怎么样。在哪里可以为默认生产者工厂配置
maxAge
?
我使用了
DefaultKafkaProducerFactoryCustomizer
,它似乎有效:
@Bean
public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer() {
return (producerFactory) -> producerFactory.setMaxAge(Duration.ofDays(1));
}
这应该在
@Configuration
课程中。
如果您使用流绑定器,您也可以这样设置
maxAge
@Bean
KafkaTransactionManager customKafkaTransactionManager(BinderFactory binder) {
KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)binder.getBinder("kafka1", MessageChannel.class);
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = (DefaultKafkaProducerFactory<byte[], byte[]>) kafka.getTransactionalProducerFactory();
producerFactory.setMaxAge(Duration.ofSeconds(60));
return new KafkaTransactionManager(producerFactory);
}