让我们考虑多线程跨国kafka生产者。我应该在关闭之前flush()
生产者吗?换句话说,事务生成器是否在发送批量数据之前先对其进行缓冲?
如javadocs中所述
应用程序无需为事务处理调用flush方法生产者,因为commitTransaction()将刷新所有缓冲的提交之前的记录
这在javadoc示例中得到最好的说明
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
简而言之,最好使用生产者的事务性API(这些API会阻塞并在失败时引发异常)。
此外,对于多线程应用程序,您需要确保每个生产者只有一个未完成的事务。而且,如果您在交易过程中遇到异常,则应调用producer.abortTransaction()
(在示例中也已突出显示)以维护生产者交易能力的[[仅一次语义。