如何使用Apache Kafka实现 "精确一次 "的kafka消费者?

问题描述 投票:0回答:1

我们有一个应用程序,它从Kafka主题(3个分区)中消费消息,并丰富数据和保存记录到DB中(Spring JPA),然后将消息发布到另一个kafka主题(在同一个经纪商上),所有这些都是通过使用Camel 2.4.1和Spring Boot 2.1.7.RELEASE协调的。

我们希望实现kafka消费者-生产者组合的 "exactly-once "语义。

消费者设置。

   autoOffsetReset: earliest
   autoCommitEnable: false
   allowManualCommit: true
   breakOnFirstError: true
   group.id : CONSUMER.GROUP.ID
   count: 3
   max.poll.records = 1  # rollback when message processing fails.

生产者设置:

   idempotence: true
   transactionIdPrefix: txn-prefix-id

Bean Wiring:

   @Bean
    SpringTransactionPolicy springTransactionPolicy() throws Exception {
        SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
        txRequired.setTransactionManager(transactionManager());
        txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
        return txRequired;
    }

    @Bean
    public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
        DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
                kafkaConfigs());
        // enable transaction manager
        defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
        return defaultKafkaProducerFactory;
    }


    @Bean
    @Primary
    public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
        return new ChainedKafkaTransactionManager<>(kafkaTransactionManager(),jpaTransactionManager());
    }

    @Bean
    public PlatformTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
        kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        kafkaTransactionManager.setRollbackOnCommitFailure(true);
        return kafkaTransactionManager;
    }

    @Bean
    JpaTransactionManager jpaTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setRollbackOnCommitFailure(true);
        return transactionManager;
    }

骆驼路线。

public RoutesBuilder inboundRoute() {
        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {
                //Common error handler
                onException(UnsupportedMessageTypeException.class).
                   maximumRedeliveries(redeliveryCount).
                   handled(true).
                   bean(ExceptionPropagatorProcessor.class, "process").
                   bean(manualCommitProcessor).
                end();

                onException(AppRuntimeException.class).
                   maximumRedeliveries(redeliveryCount).                
                   bean(ExceptionPropagatorProcessor.class, "process")
                end();  

                onException(RetryExhaustedException.class).
                   maximumRedeliveries(0).// No retry for this exception
                   handled(true).
                   bean(ExceptionPropagatorProcessor.class, "process").
                   bean(kafkaManualCommitProcessor).
                end(); 

                from("kafka:inboundTopic").
                    routeId("consume-msg").
                    transacted("springTransactionPolicy").
                               bean(transactionBeginProcessor).
                               //check if this is a retry scenario, the max retry count reached then throw RetryExhaustedException.
                               bean(retryEvaluationProcessor). 
                               bean(enrichProcessor). // publish kafka messages
                               bean(persistenceProcessor).
                               bean(transactionEndProcessor). // publish kafka messages
                    bean(manualCommitProcessor);

但我们无法让kafka生产者在有异常处理的情况下提交消息。我缺少什么,正确的方法是什么?

java spring apache-kafka apache-camel spring-camel
1个回答
0
投票

你似乎在使用Spring Kafka,而他们的KafkaTransactionManager并不是一个真正的XA事务管理器(参见他们的文档中的限制),所以你不能用它来回滚Kafka和JDBC数据库等。

而且camel-kafka不支持Kafka事务(目前)。我已经创建了一个票据。https:/issues.apache.orgjirabrowseCAMEL-15016。

© www.soinside.com 2019 - 2024. All rights reserved.