我正在使用kafka消费者。我从 Spring boot 1.5 升级到 2.6。现在,当我运行应用程序时,它无法开始抛出 NullPointerException 。如果有人可以帮助我解决这个问题,请告诉我。
Caused by: java.lang.NullPointerException
at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) ~[?:1.8.0_25]
at java.util.concurrent.ConcurrentHashMap.putAll(ConcurrentHashMap.java:1084) ~[?:1.8.0_25]
at java.util.concurrent.ConcurrentHashMap.<init>(ConcurrentHashMap.java:852) ~[?:1.8.0_25]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.<init>(DefaultKafkaConsumerFactory.java:129) ~[spring-kafka-2.8.4.jar:2.8.4]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.<init>(DefaultKafkaConsumerFactory.java:98) ~[spring-kafka-2.8.4.jar:2.8.4]
Failed to instantiate [org.springframework.kafka.core.ConsumerFactory
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.6</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.2.12.RELEASE</version>
</dependency>
@Slf4j
@Configuration
@EnableKafka
@ExcludeKafkaReceiver
public class XXTriggerKakfaReceiverConfig {
@Autowired
xxTriggerKafkaManagedBean kafkaListenerConfig;
@Bean
public Map<String, Object> xxConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
kafkaListenerConfig.getKafkaServerConfig());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerConfig.getKafkaServerConfig());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageSerializerType.getValue(kafkaListenerConfig.getValueDeserializerClass()));
props.put(Constants.AVRO_MSG_CLASS_TYPE, kafkaListenerConfig.getAvroMessageClassType());
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, kafkaListenerConfig.getPartitionAssignmentStrategy());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerConfig.getGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerConfig.getAutoOffsetReset());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerConfig.getEnableAutoCommit());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerConfig.getSessionTimeoutMS());
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerConfig.getHeartbeatIntervalMS());
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaListenerConfig.getRequestTimeoutMS());
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, kafkaListenerConfig.getFetchMaxWaitMS());
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, kafkaListenerConfig.getReconnectBackoffMS());
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaListenerConfig.getRetryBackoffMS());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaListenerConfig.getMaxPartitionFetchSize());
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, kafkaListenerConfig.getFetchMinBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerConfig.getMaxPollRecords());
return props;
}
@Bean
public ConsumerFactory<String, Object> xxConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(xxConsumerConfigs()); //This is the place throwing Null pointer Exception
}
/**
* A concurrent Kafka listener with concurrency managed through {concurrent_kafka_listeners} property value.
*
* @return ConcurrentKafkaListenerContainerFactory instance
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> xxKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(xxConsumerFactory());
factory.setConcurrency(kafkaListenerConfig.getNumberOfConcurrentListeners());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
ErrorHandler errorHandler = new KafkaListenerErrorHandler();
errorHandler.setAckAfterHandle(false);
factory.setCommonErrorHandler((CommonErrorHandler) errorHandler);
return factory;
}
/**
* A concurrent Kafka batch listener with concurrency managed through {concurrent_kafka_listeners} property value.
*
* @return ConcurrentKafkaListenerContainerFactory instance
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> xxKafkaBatchListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(xxConsumerFactory());
factory.setConcurrency(kafkaListenerConfig.getNumberOfConcurrentListeners());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
ErrorHandler errorHandler = new KafkaListenerErrorHandler();
errorHandler.setAckAfterHandle(false);
factory.setCommonErrorHandler((CommonErrorHandler) errorHandler);
factory.setBatchListener(true);
return factory;
}
我遇到了同样的错误,在 Springboot 升级之前(v2.2.4 -> v2.7.3),代码工作正常,升级后,它开始抛出
NullPointerException
。我强烈建议调试代码并进入 Spring 库以了解哪个属性发生了故障。就我而言,是
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG=
or
ssl.endpoint.identification.algorithm=
在升级之前作为
null
工作得很好。但升级后,由于 Kafka 库的更改,必须将其设置为空字符串。
从此答案中找到帮助:https://stackoverflow.com/a/56648768/4898612。
祝你好运!
请确保从 kafkaListenerConfig 获取的所有值不为 null,即 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerConfig.getKafkaServerConfig());
如果必需参数之一为 null,DefaultKafkaConsumer 会抛出 NPE。