Kafka Consumer 无法间歇性解析监听器方法

问题描述 投票:0回答:1

我在 Kafka 消费者端遇到了下面的异常。令人惊讶的是,这个问题“不一致”,并且旧版本的代码(具有完全相同的配置,但有一些新的不相关功能)按预期工作。谁能帮忙确定是什么原因造成的? [ERROR][938f3c68-f481-4224-b2c6-43af5fb27ada-0-C-1][org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] - Error handler threw an exception org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)] Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}] at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:77) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2010) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1854) [spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1720) [spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1699) [spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) [spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.1.jar:2.7.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.lang.Thread.run(Thread.java:834) [?:?] Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)] Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2367) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2003) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1] ... 8 more Caused by: org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:145) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1983) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1] ... 8 more

我的应用程序使用以下内容:

自定义侦听器类
    com.mycompany.listener.KafkaBatchListener<K, V>
  1. ,它
    实现
    org.springframework.kafka.listener.BatchAcknowledgingMessageListener<K, V> 并使用自定义标记注释
    onMessage(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment)
    覆盖
    @MyKafkaListener
    一个自定义容器工厂,它
    扩展
  2. org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V>并配置setConsumerFactory(consumerFactory)
    setBatchErrorHandler(errorHandler)
    setBatchListener(true)
    ContainerProperties.setOnlyLogRecordMetadata(true)
    SpringBoot 
    @Configuration
  3. 类,
  4. 实现
    org.springframework.kafka.annotation.KafkaListenerConfigurer并负责配置org.springframework.kafka.core.DefaultKafkaConsumerFactory<K, V>
    org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V>
    org.springframework.kafka.config.MethodKafkaListenerEndpoint<String, String>
    (由
    @MyKafkaListener
    使用)
    春季卡夫卡2.7.1
  5. 补充询问:
即使设置了

ContainerProperties.setOnlyLogRecordMetadata(true),异常堆栈跟踪仍然包含我省略的完整payload

。知道为什么吗?
提前致谢!

更新:

KafkaBatchListener

  1. package com.mycompany.listener; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.BatchAcknowledgingMessageListener; import org.springframework.kafka.support.Acknowledgment; public class KafkaBatchListener<K, V> implements BatchAcknowledgingMessageListener<K, V> { @Override @com.mycompany.listener.KafkaListener public void onMessage(final List<ConsumerRecord<K, V>> consumerRecords, final Acknowledgment acknowledgment) { // process batch using MyService<K, V>.process(consumerRecords) acknowledgment.acknowledge(); } }
自定义注释
  1. package com.mycompany.listener; import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.RetentionPolicy.RUNTIME; import java.lang.annotation.Retention; import java.lang.annotation.Target; @Retention(RUNTIME) @Target(METHOD) public @interface KafkaListener { }
监听器容器工厂
  1. package com.mycompany.factory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler; public class KafkaBatchListenerContainerFactory<K, V> extends ConcurrentKafkaListenerContainerFactory<K, V> { public KafkaBatchListenerContainerFactory(final DefaultKafkaConsumerFactory<K, V> consumerFactory, final ListenerContainerRecoveringBatchErrorHandler errorHandler, final int concurrency) { super.setConsumerFactory(consumerFactory); super.setBatchErrorHandler(errorHandler); super.setConcurrency(concurrency); super.setBatchListener(true); super.setAutoStartup(true); final ContainerProperties containerProperties = super.getContainerProperties(); containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL); containerProperties.setOnlyLogRecordMetadata(true); } }
批量错误处理程序
  1. package com.mycompany.errorhandler; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.listener.RecoveringBatchErrorHandler; import org.springframework.stereotype.Component; import org.springframework.util.backoff.FixedBackOff; @Component public class ListenerContainerRecoveringBatchErrorHandler extends RecoveringBatchErrorHandler { public ListenerContainerRecoveringBatchErrorHandler( @Value("${spring.kafka.consumer.properties.backOffMS:0}") final int backOffTimeMS, @Value("${spring.kafka.consumer.properties.retries:3}") final int retries) { super(new FixedBackOff(backOffTimeMS, retries)); } }
Kafka 监听器配置器
  1. package com.mycompany.config; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler; import com.mycompany.factory.KafkaBatchListenerContainerFactory; import com.mycompany.listener.KafkaBatchListener; @Configuration public class KafkaBatchListenerConfigurer<K, V> implements KafkaListenerConfigurer { private final List<KafkaBatchListener<K, V>> listeners; private final BeanFactory beanFactory; private final ListenerContainerRecoveringBatchErrorHandler errorHandler; private final int concurrency; @Autowired public KafkaBatchListenerConfigurer(final List<KafkaBatchListener<K, V>> listeners, final BeanFactory beanFactory, final ListenerContainerRecoveringBatchErrorHandler errorHandler, @Value("${spring.kafka.listener.concurrency:1}") final int concurrency) { this.listeners = listeners; this.beanFactory = beanFactory; this.errorHandler = errorHandler; this.concurrency = concurrency; } @Override public void configureKafkaListeners(final KafkaListenerEndpointRegistrar registrar) { final Method listenerMethod = lookUpBatchListenerMethod(); listeners.forEach(listener -> { registerListenerEndpoint(listener, listenerMethod, registrar); }); } private void registerListenerEndpoint(final KafkaBatchListener<K, V> listener, final Method listenerMethod, final KafkaListenerEndpointRegistrar registrar) { // final Map<String, Object> consumerConfig = get ConsumerConfig from a custom provider; registrar.setContainerFactory(createContainerFactory(consumerConfig)); registrar.registerEndpoint(createListenerEndpoint(listener, listenerMethod, consumerConfig)); } private KafkaBatchListenerContainerFactory<K, V> createContainerFactory(final Map<String, Object> consumerConfig) { final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig); final KafkaBatchListenerContainerFactory<K, V> containerFactory = new KafkaBatchListenerContainerFactory<>( consumerFactory, errorHandler, concurrency); return containerFactory; } private MethodKafkaListenerEndpoint<String, String> createListenerEndpoint(final KafkaBatchListener<K, V> listener, final Method listenerMethod, final Map<String, Object> consumerConfig) { final MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setId(UUID.randomUUID().toString()); endpoint.setBean(listener); endpoint.setMethod(listenerMethod); endpoint.setBeanFactory(beanFactory); endpoint.setGroupId("my-group-id"); endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // final String topicName = get TopicName for this key-value from a custom utility; endpoint.setTopics(topicName); final Properties properties = new Properties(); properties.putAll(consumerConfig); endpoint.setConsumerProperties(properties); return endpoint; } private Method lookUpBatchListenerMethod() { return Arrays.stream(com.mycompany.listener.KafkaBatchListener.class.getMethods()) .filter(m -> m.isAnnotationPresent(com.mycompany.listener.KafkaListener.class)) .findAny() .orElseThrow(() -> new IllegalStateException( String.format("[%s] class should have at least 1 method with [%s] annotation.", com.mycompany.listener.KafkaBatchListener.class.getCanonicalName(), com.mycompany.listener.KafkaListener.class.getCanonicalName()))); } }
当您的侦听器已经实现了消息侦听器接口之一时,您不需要调用基础结构的所有标准 
spring spring-boot apache-kafka spring-kafka kafka-consumer-api
1个回答
1
投票
方法;无需为每个侦听器注册端点,只需从工厂为每个侦听器创建一个容器并将侦听器添加到容器属性即可。

val container = containerFactory.createContainer("topic1");
container.getContainerProperties().set...
...
container.getContainerProperies().setMessageListener(myListenerInstance);
...
container.start();


© www.soinside.com 2019 - 2024. All rights reserved.