多线程事务性kafka生产者-我应该在关闭之前刷新吗?

问题描述 投票:1回答:1

让我们考虑多线程跨国kafka生产者。我应该在关闭之前flush()生产者吗?换句话说,事务生成器是否在发送批量数据之前先对其进行缓冲?

java apache-kafka kafka-producer-api
1个回答
0
投票

javadocs中所述

应用程序无需为事务处理调用flush方法生产者,因为commitTransaction()将刷新所有缓冲的提交之前的记录

这在javadoc示例中得到最好的说明

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
 producer.beginTransaction();
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
 producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
 // We can't recover from these exceptions, so our only option is to close the producer and exit.
 producer.close();
} catch (KafkaException e) {
 // For all other exceptions, just abort the transaction and try again.
 producer.abortTransaction();
}
producer.close();

简而言之,最好使用生产者的事务性API(这些API会阻塞并在失败时引发异常)。

此外,对于多线程应用程序,您需要确保每个生产者只有一个未完成的事务。而且,如果您在交易过程中遇到异常,则应调用producer.abortTransaction()(在示例中也已突出显示)以维护生产者交易能力的[[仅一次语义。

© www.soinside.com 2019 - 2024. All rights reserved.