在226-230的org.springframework.kafka.listener.KafkaMessageListenerContainer
行中,Spring Kafka将默认的SimpleAsyncTaskExecutor
指定为侦听器容器的使用者任务执行器:
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
在文档中,指出使用自定义执行程序而不是possible是SimpleAsyncTaskExecutor
:
如果您不提供消费者执行者,则使用
SimpleAsyncTaskExecutor
;此执行程序创建名为<beanName>-C-1
(使用者线程)的线程。对于ConcurrentMessageListenerContainer
,线程名称的<beanName>
部分变为<beanName>-m
,其中m
表示消费者实例。每次启动容器时,n
都会递增。因此,使用容器的bean名称,容器第一次启动后,此容器中的线程将被命名为container-0-C-1
,container-1-C-1
等;停止/开始后,container-0-C-2
,container-1-C-2
等。
但是,当为Kafka(即KafkaAutoConfiguration
)启用Spring Boot的自动配置时,我找不到自定义执行程序的方法。
简而言之,设置(ContainerProperties#consumerTaskExecutor
)的正确方法是什么?
是的,你不能通过配置属性这样做,因为它不是那么简单的属性,但你可以覆盖bean:
@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
return configurer;
}
与您的自定义实现。并在其qazxsw poi方法中调用适当的qazxsw poi。