在使用自动配置时,如何为Spring Kafka设置自定义使用者任务执行程序?

问题描述 投票:0回答:1

226-230org.springframework.kafka.listener.KafkaMessageListenerContainer行中,Spring Kafka将默认的SimpleAsyncTaskExecutor指定为侦听器容器的使用者任务执行器:

    if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }

在文档中,指出使用自定义执行程序而不是possibleSimpleAsyncTaskExecutor

如果您不提供消费者执行者,则使用SimpleAsyncTaskExecutor;此执行程序创建名为<beanName>-C-1(使用者线程)的线程。对于ConcurrentMessageListenerContainer,线程名称的<beanName>部分变为<beanName>-m,其中m表示消费者实例。每次启动容器时,n都会递增。因此,使用容器的bean名称,容器第一次启动后,此容器中的线程将被命名为container-0-C-1container-1-C-1等;停止/开始后,container-0-C-2container-1-C-2等。

但是,当为Kafka(即KafkaAutoConfiguration)启用Spring Boot的自动配置时,我找不到自定义执行程序的方法。

简而言之,设置(ContainerProperties#consumerTaskExecutor)的正确方法是什么?

spring-boot spring-kafka
1个回答
0
投票

是的,你不能通过配置属性这样做,因为它不是那么简单的属性,但你可以覆盖bean:

@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
    configurer.setKafkaProperties(this.properties);
    return configurer;
}

与您的自定义实现。并在其qazxsw poi方法中调用适当的qazxsw poi。

© www.soinside.com 2019 - 2024. All rights reserved.