我在 Kafka 消费者端遇到了下面的异常。令人惊讶的是,这个问题“不一致”,并且旧版本的代码(具有完全相同的配置,但有一些新的不相关功能)按预期工作。谁能帮忙确定是什么原因造成的?
[ERROR][938f3c68-f481-4224-b2c6-43af5fb27ada-0-C-1][org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:77) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2010) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1854) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1720) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1699) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) [spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.1.jar:2.7.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2367) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2003) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
... 8 more
Caused by: org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:145) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1983) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
... 8 more
我的应用程序使用以下内容:
自定义侦听器类
com.mycompany.listener.KafkaBatchListener<K, V>
实现
org.springframework.kafka.listener.BatchAcknowledgingMessageListener<K, V>
并使用自定义标记注释onMessage(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment)
覆盖@MyKafkaListener
一个自定义容器工厂,它扩展
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V>
并配置setConsumerFactory(consumerFactory)
、
setBatchErrorHandler(errorHandler)
、
setBatchListener(true)
和
ContainerProperties.setOnlyLogRecordMetadata(true)
。
SpringBoot
@Configuration
org.springframework.kafka.annotation.KafkaListenerConfigurer
并负责配置org.springframework.kafka.core.DefaultKafkaConsumerFactory<K, V>
、
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V>
和
org.springframework.kafka.config.MethodKafkaListenerEndpoint<String, String>
(由
@MyKafkaListener
使用)
春季卡夫卡2.7.1
ContainerProperties.setOnlyLogRecordMetadata(true)
,异常堆栈跟踪仍然包含我省略的完整payload
。知道为什么吗?提前致谢! 更新:
KafkaBatchListener
package com.mycompany.listener;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
public class KafkaBatchListener<K, V> implements BatchAcknowledgingMessageListener<K, V> {
@Override
@com.mycompany.listener.KafkaListener
public void onMessage(final List<ConsumerRecord<K, V>> consumerRecords, final Acknowledgment acknowledgment) {
// process batch using MyService<K, V>.process(consumerRecords)
acknowledgment.acknowledge();
}
}
自定义注释
package com.mycompany.listener;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
@Retention(RUNTIME)
@Target(METHOD)
public @interface KafkaListener {
}
监听器容器工厂
package com.mycompany.factory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;
public class KafkaBatchListenerContainerFactory<K, V>
extends ConcurrentKafkaListenerContainerFactory<K, V> {
public KafkaBatchListenerContainerFactory(final DefaultKafkaConsumerFactory<K, V> consumerFactory,
final ListenerContainerRecoveringBatchErrorHandler errorHandler, final int concurrency) {
super.setConsumerFactory(consumerFactory);
super.setBatchErrorHandler(errorHandler);
super.setConcurrency(concurrency);
super.setBatchListener(true);
super.setAutoStartup(true);
final ContainerProperties containerProperties = super.getContainerProperties();
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
containerProperties.setOnlyLogRecordMetadata(true);
}
}
批量错误处理程序
package com.mycompany.errorhandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.listener.RecoveringBatchErrorHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;
@Component
public class ListenerContainerRecoveringBatchErrorHandler extends RecoveringBatchErrorHandler {
public ListenerContainerRecoveringBatchErrorHandler(
@Value("${spring.kafka.consumer.properties.backOffMS:0}") final int backOffTimeMS,
@Value("${spring.kafka.consumer.properties.retries:3}") final int retries) {
super(new FixedBackOff(backOffTimeMS, retries));
}
}
Kafka 监听器配置器
package com.mycompany.config;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;
import com.mycompany.factory.KafkaBatchListenerContainerFactory;
import com.mycompany.listener.KafkaBatchListener;
@Configuration
public class KafkaBatchListenerConfigurer<K, V> implements KafkaListenerConfigurer {
private final List<KafkaBatchListener<K, V>> listeners;
private final BeanFactory beanFactory;
private final ListenerContainerRecoveringBatchErrorHandler errorHandler;
private final int concurrency;
@Autowired
public KafkaBatchListenerConfigurer(final List<KafkaBatchListener<K, V>> listeners, final BeanFactory beanFactory,
final ListenerContainerRecoveringBatchErrorHandler errorHandler,
@Value("${spring.kafka.listener.concurrency:1}") final int concurrency) {
this.listeners = listeners;
this.beanFactory = beanFactory;
this.errorHandler = errorHandler;
this.concurrency = concurrency;
}
@Override
public void configureKafkaListeners(final KafkaListenerEndpointRegistrar registrar) {
final Method listenerMethod = lookUpBatchListenerMethod();
listeners.forEach(listener -> {
registerListenerEndpoint(listener, listenerMethod, registrar);
});
}
private void registerListenerEndpoint(final KafkaBatchListener<K, V> listener, final Method listenerMethod,
final KafkaListenerEndpointRegistrar registrar) {
// final Map<String, Object> consumerConfig = get ConsumerConfig from a custom provider;
registrar.setContainerFactory(createContainerFactory(consumerConfig));
registrar.registerEndpoint(createListenerEndpoint(listener, listenerMethod, consumerConfig));
}
private KafkaBatchListenerContainerFactory<K, V> createContainerFactory(final Map<String, Object> consumerConfig) {
final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig);
final KafkaBatchListenerContainerFactory<K, V> containerFactory = new KafkaBatchListenerContainerFactory<>(
consumerFactory, errorHandler, concurrency);
return containerFactory;
}
private MethodKafkaListenerEndpoint<String, String> createListenerEndpoint(final KafkaBatchListener<K, V> listener,
final Method listenerMethod, final Map<String, Object> consumerConfig) {
final MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setId(UUID.randomUUID().toString());
endpoint.setBean(listener);
endpoint.setMethod(listenerMethod);
endpoint.setBeanFactory(beanFactory);
endpoint.setGroupId("my-group-id");
endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
// final String topicName = get TopicName for this key-value from a custom utility;
endpoint.setTopics(topicName);
final Properties properties = new Properties();
properties.putAll(consumerConfig);
endpoint.setConsumerProperties(properties);
return endpoint;
}
private Method lookUpBatchListenerMethod() {
return Arrays.stream(com.mycompany.listener.KafkaBatchListener.class.getMethods())
.filter(m -> m.isAnnotationPresent(com.mycompany.listener.KafkaListener.class))
.findAny()
.orElseThrow(() -> new IllegalStateException(
String.format("[%s] class should have at least 1 method with [%s] annotation.",
com.mycompany.listener.KafkaBatchListener.class.getCanonicalName(),
com.mycompany.listener.KafkaListener.class.getCanonicalName())));
}
}
val container = containerFactory.createContainer("topic1");
container.getContainerProperties().set...
...
container.getContainerProperies().setMessageListener(myListenerInstance);
...
container.start();