我们有一个应用程序,它从Kafka主题(3个分区)中消费消息,并丰富数据和保存记录到DB中(Spring JPA),然后将消息发布到另一个kafka主题(在同一个经纪商上),所有这些都是通过使用Camel 2.4.1和Spring Boot 2.1.7.RELEASE协调的。
我们希望实现kafka消费者-生产者组合的 "exactly-once "语义。
消费者设置。
autoOffsetReset: earliest
autoCommitEnable: false
allowManualCommit: true
breakOnFirstError: true
group.id : CONSUMER.GROUP.ID
count: 3
max.poll.records = 1 # rollback when message processing fails.
生产者设置:
idempotence: true
transactionIdPrefix: txn-prefix-id
Bean Wiring:
@Bean
SpringTransactionPolicy springTransactionPolicy() throws Exception {
SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
txRequired.setTransactionManager(transactionManager());
txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return txRequired;
}
@Bean
public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
kafkaConfigs());
// enable transaction manager
defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
return defaultKafkaProducerFactory;
}
@Bean
@Primary
public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager(),jpaTransactionManager());
}
@Bean
public PlatformTransactionManager kafkaTransactionManager() {
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
kafkaTransactionManager.setRollbackOnCommitFailure(true);
return kafkaTransactionManager;
}
@Bean
JpaTransactionManager jpaTransactionManager() {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setRollbackOnCommitFailure(true);
return transactionManager;
}
骆驼路线。
public RoutesBuilder inboundRoute() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
//Common error handler
onException(UnsupportedMessageTypeException.class).
maximumRedeliveries(redeliveryCount).
handled(true).
bean(ExceptionPropagatorProcessor.class, "process").
bean(manualCommitProcessor).
end();
onException(AppRuntimeException.class).
maximumRedeliveries(redeliveryCount).
bean(ExceptionPropagatorProcessor.class, "process")
end();
onException(RetryExhaustedException.class).
maximumRedeliveries(0).// No retry for this exception
handled(true).
bean(ExceptionPropagatorProcessor.class, "process").
bean(kafkaManualCommitProcessor).
end();
from("kafka:inboundTopic").
routeId("consume-msg").
transacted("springTransactionPolicy").
bean(transactionBeginProcessor).
//check if this is a retry scenario, the max retry count reached then throw RetryExhaustedException.
bean(retryEvaluationProcessor).
bean(enrichProcessor). // publish kafka messages
bean(persistenceProcessor).
bean(transactionEndProcessor). // publish kafka messages
bean(manualCommitProcessor);
但我们无法让kafka生产者在有异常处理的情况下提交消息。我缺少什么,正确的方法是什么?
你似乎在使用Spring Kafka,而他们的KafkaTransactionManager并不是一个真正的XA事务管理器(参见他们的文档中的限制),所以你不能用它来回滚Kafka和JDBC数据库等。
而且camel-kafka不支持Kafka事务(目前)。我已经创建了一个票据。https:/issues.apache.orgjirabrowseCAMEL-15016。