KafkaStream 在代理滚动升级时达到 ERROR 状态

问题描述 投票:0回答:1

我正在使用 Kafka 2.8.1 (AWS MSK)。每当 AWS 完成一些滚动升级时,我都会观察到我的 Kafka 流应用程序达到错误状态,并且在尝试查询状态存储时不断收到以下异常

java.lang.IllegalStateException: Error when retrieving state store.
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$3(InteractiveQueryService.java:237)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)
    at org.springframework.cloud.stream.binder.kafka.streams.**InteractiveQueryService.getHostInfo(InteractiveQueryService.java:222)**
    at com.sp.gos.processors.GossiperKVStoreQueryService.getHostInfo(GossiperKVStoreQueryService.java:108)
    at com.sp.gos.processors.GossiperKVStoreQueryService.get(GossiperKVStoreQueryService.java:68)
...
caused by: java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
    at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
    at org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.HashMap$KeySpliterator.tryAdvance(HashMap.java:1728)
    at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
    at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$3(InteractiveQueryService.java:228)

我还在属性中添加了一个日志并继续异常处理程序,如下所示,每当我遇到此问题时,我都看不到代码到达那里

props.put(
        "default.deserialization.exception.handler",
        LogAndContinueExceptionHandler.class);
props.put(
        "default.production.exception.handler",
        LogAndContinueProductionExceptionHandler.class);

有人可以告诉我出了什么问题吗?我的配置是否有任何错误导致了此问题?一旦我重新启动我的应用程序,一切就开始正常工作。

  • org.springframework.kafka:spring-kafka:jar
    - 3.0.9
  • org.apache.kafka:kafka-clients
    - 3.5.1
  • org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
    - 4.0.4
  • org.apache.kafka:kafka-streams
    - 3.5.1
apache-kafka spring-kafka apache-kafka-streams spring-cloud-stream
1个回答
0
投票

虽然我没有使用异常处理程序,但为什么你期望你的处理程序类被调用。

您看到流(消费者)异常 - 因此不会调用生产处理程序。

同样,异常与数据的反序列化无关。

© www.soinside.com 2019 - 2024. All rights reserved.