Spring Boot
v2.6.7
应用程序已在生产中可用,它使用来自 Kafka
的数据,进行转换并将数据存储到数据库。
要处理 100k
Kafka
记录应用程序需要大约。在我的本地计算机上花费 10 分钟(其中 Kafka
中的本地 docker
,启用一个分区和一个消费者的主题):
first run: 10 min 20 sec.
second run: 10 min 05 sec.
third run: 10 min 36 sec.
我决定引入非阻塞错误处理并添加一些bean:
@Bean
public KafkaTemplate<String, String> retryProducerKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getConsumer().getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaProperties.getConsumer().getSecurity().getProtocol());
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public RetryTopicConfiguration retryTopicConfiguration() {
return RetryTopicConfigurationBuilder
.newInstance()
.maxAttempts(2)
.retryTopicSuffix("-retry")
.doNotAutoCreateRetryTopics()
.dltSuffix("-dlt")
.includeTopics(consumerTopics)
.exponentialBackoff(1000, 2, 10000)
.retryOn(List.of(DataAccessException.class))
.dltHandlerMethod("eventsListener", "dlt")
.create(retryProducerKafkaTemplate());
}
我实际上所做的一切 - 它只是添加了这 3 个 bean,没有其他配置受到影响。
运行新一批 100k 测试并得到以下结果:
first run: 11min 20sec
second run:11min 12sec
third run: 11min 30sec
似乎为
Kafka
添加非阻塞错误处理会立即降低性能 ~10%,这让我很惊讶。
还有人遇到同样的情况吗?并且还对应用程序性能产生负面影响? 或者我可能只是错过了配置中的某些内容?
您不仅添加了该 beans,还启用了 指数退避。
参见backoff-handlers参考,特别是这个声明:
...默认处理程序只是挂起线程直到退避时间 通过(或容器停止)...
由于您使用单分区主题,因此不存在并发因素。在这种情况下,如果出现错误,整个消费可能会被阻止。
根据提供的配置,除了显式序列化映射之外,没有任何东西可以降低吞吐量。如果消费者实际上没有提出错误,则尝试注释该行并重新运行测试。字符串序列化可能比二进制序列化慢。