默认KafkaConsumerFactory - ConcurrentHashMap - NullPointerException

问题描述 投票:0回答:2

我正在使用kafka消费者。我从 Spring boot 1.5 升级到 2.6。现在,当我运行应用程序时,它无法开始抛出 NullPointerException 。如果有人可以帮助我解决这个问题,请告诉我。

Caused by: java.lang.NullPointerException
    at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) ~[?:1.8.0_25]
    at java.util.concurrent.ConcurrentHashMap.putAll(ConcurrentHashMap.java:1084) ~[?:1.8.0_25]
    at java.util.concurrent.ConcurrentHashMap.<init>(ConcurrentHashMap.java:852) ~[?:1.8.0_25]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.<init>(DefaultKafkaConsumerFactory.java:129) ~[spring-kafka-2.8.4.jar:2.8.4]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.<init>(DefaultKafkaConsumerFactory.java:98) ~[spring-kafka-2.8.4.jar:2.8.4]

Failed to instantiate [org.springframework.kafka.core.ConsumerFactory

pom.xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.6</version>
    </parent>
<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>5.2.12.RELEASE</version>
        </dependency> 

接收器配置类:

@Slf4j
@Configuration
@EnableKafka
@ExcludeKafkaReceiver
public class XXTriggerKakfaReceiverConfig {
   
    @Autowired
    xxTriggerKafkaManagedBean kafkaListenerConfig;

    @Bean
    public Map<String, Object> xxConsumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        kafkaListenerConfig.getKafkaServerConfig());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerConfig.getKafkaServerConfig());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageSerializerType.getValue(kafkaListenerConfig.getValueDeserializerClass()));
        props.put(Constants.AVRO_MSG_CLASS_TYPE, kafkaListenerConfig.getAvroMessageClassType());
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, kafkaListenerConfig.getPartitionAssignmentStrategy());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerConfig.getGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerConfig.getAutoOffsetReset());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerConfig.getEnableAutoCommit());
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerConfig.getSessionTimeoutMS());
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerConfig.getHeartbeatIntervalMS());
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaListenerConfig.getRequestTimeoutMS());
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, kafkaListenerConfig.getFetchMaxWaitMS());
        props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, kafkaListenerConfig.getReconnectBackoffMS());
        props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaListenerConfig.getRetryBackoffMS());
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaListenerConfig.getMaxPartitionFetchSize());
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, kafkaListenerConfig.getFetchMinBytes());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerConfig.getMaxPollRecords());

        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> xxConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(xxConsumerConfigs());  //This is the place throwing Null pointer Exception
    }

    /**
     * A concurrent Kafka listener with concurrency managed through {concurrent_kafka_listeners} property value.
     *
     * @return ConcurrentKafkaListenerContainerFactory instance 
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> xxKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(xxConsumerFactory());
        factory.setConcurrency(kafkaListenerConfig.getNumberOfConcurrentListeners());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        ErrorHandler errorHandler = new KafkaListenerErrorHandler();
        errorHandler.setAckAfterHandle(false);        
        factory.setCommonErrorHandler((CommonErrorHandler) errorHandler);
        return factory;
    }

    /**
     * A concurrent Kafka batch listener with concurrency managed through {concurrent_kafka_listeners} property value.
     *
     * @return ConcurrentKafkaListenerContainerFactory instance
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> xxKafkaBatchListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(xxConsumerFactory());
        factory.setConcurrency(kafkaListenerConfig.getNumberOfConcurrentListeners());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        ErrorHandler errorHandler = new KafkaListenerErrorHandler(); 
        errorHandler.setAckAfterHandle(false);        
        factory.setCommonErrorHandler((CommonErrorHandler) errorHandler);
        factory.setBatchListener(true);
        return factory;
    }
java spring spring-boot kafka-consumer-api spring-kafka
2个回答
0
投票

我遇到了同样的错误,在 Springboot 升级之前(v2.2.4 -> v2.7.3),代码工作正常,升级后,它开始抛出

NullPointerException
。我强烈建议调试代码并进入 Spring 库以了解哪个属性发生了故障。就我而言,是

    SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG=
or
    ssl.endpoint.identification.algorithm=

在升级之前作为

null
工作得很好。但升级后,由于 Kafka 库的更改,必须将其设置为空字符串。

从此答案中找到帮助:https://stackoverflow.com/a/56648768/4898612

祝你好运!


0
投票

请确保从 kafkaListenerConfig 获取的所有值不为 null,即 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerConfig.getKafkaServerConfig());

如果必需参数之一为 null,DefaultKafkaConsumer 会抛出 NPE。

© www.soinside.com 2019 - 2024. All rights reserved.