事务性Kafka生产者

问题描述 投票:0回答:1

我正试图让我的kafka producer成为事务性的,我正在发送10条消息.如果发生任何错误,不应该向kafka发送任何消息,即没有或全部。

我使用的是Spring Boot KafkaTemplate。

@Configuration
@EnableKafka
public class KakfaConfiguration {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
        // appProps.getJksLocation());
        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
        // appProps.getJksPassword());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, acks);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
        config.put(ProducerConfig.RETRIES_CONFIG, retries);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "ktm")
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

}

我正在发送10条消息,就像文档中提到的那样。9条消息应该被发送,而我的消息大小超过1MB,被Kafka broker拒绝,原因是 RecordTooLargeException

https:/docs.spring.iospring-kafkareferencehtml#using-kafkatransactionmanager。

@Component
@EnableTransactionManagement
class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Transactional("ktm")
    public void sendThem(List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(" message sucess : " + result.getProducerRecord().value());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Message Failed ");
                latch.countDown();
            }
        };

        toSend.forEach(str -> {
            ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
            future.addCallback(callback);
        });

        if (latch.await(12, TimeUnit.MINUTES)) {
            LOG.info("All sent ok");
        } else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }

但是当我看到t_hello_world这个主题时,有9条消息。我的期望是看到0条消息,因为我的生产者是事务性的。我怎样才能实现呢?

我得到的日志如下

2020-04-30 18:04:36.036 ERROR 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:923) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:297) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1013) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:296) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:713) ~[kafka-clients-2.4.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java



Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

2020-04-30 18:04:36.037  WARN 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
2020-04-30 18:04:36.038  INFO 18688 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-prod-990, transactionalId=prod-990] Closing the Kafka producer with timeoutMillis = 5000 **ms.
2020-04-30 18:04:36.038  INFO 18688 --- [oducer-prod-990] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-990, transactionalId=prod-990] Aborting incomplete transaction due to shutdown**
apache-kafka spring-kafka kafka-producer-api
1个回答
2
投票

未提交的记录会被写入日志;当一个事务提交或回滚时,一个额外的记录会被写入日志,其中包括事务的状态。

消费者,默认情况下,可以看到所有记录,包括未提交的记录(但不包括特殊的commitabort记录)。

对于控制台消费者,你需要将隔离级别设置为 read_committed. 请看帮助。

--isolation-level <String>           Set to read_committed in order to      
                                       filter out transactional messages    
                                       which are not committed. Set to      
                                       read_uncommitted to read all          
                                       messages. (default: read_uncommitted)
© www.soinside.com 2019 - 2024. All rights reserved.