spring-cloud-stream-binder-kafka-streams 消费者在发生 RuntimeException 时关闭

问题描述 投票:0回答:1

spring-cloud-stream-binder-kafka-streams 当消费者发生异常时,消费者停止并进入 EMPTY 状态。我想测试重试机制,但它没有按预期工作。 (https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling)。没有消息发送到 dlq。有什么想法吗?

springBootVersion=3.2.4
springDependencyManagerVersion=1.1.4
springCloudVersion=2023.0.0
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
@Slf4j
@Configuration
@RequiredArgsConstructor
public class CashbackBoosterProcessor {


    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processPosCashbackBooster() {

        return event -> event
                .mapValues(v -> {
                    if (true) {
                        throw new IllegalArgumentException("Testing of retry mechanism");
                    }
                    return v;
                });
    }
}
spring:
  cloud:
    function:
      definition: processPosCashbackBooster

    stream:
      bindings:
        processPosCashbackBooster-in-0:
         destination: input-topic
         consumer:
           concurrency: 2
           max-attempts: 3

        processPosCashbackBooster-out-0:
          destination: output-topic

      kafka:
        streams:
          bindings:
            processPosCashbackBooster-in-0:
              consumer:
                enable-dlq: true
                dlq-name: input-dlq
                configuration:
                  application.id: ${spring.application.name}-input-group
          binder:
            deserialization-exception-handler: sendtodlq
org.apache.kafka.streams.KafkaStreams    : stream-client [demo-8ed02931-b9d7-4743-a741-627001eb74de] State transition from RUNNING to PENDING_ERROR
o.a.k.s.p.internals.StreamThread         : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
o.a.k.s.p.internals.StreamThread         : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] Shutting down unclean
o.a.k.s.p.internals.StreamThread         : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] Informed to shut down
org.apache.kafka.streams.KafkaStreams    : stream-client [demo-8ed02931-b9d7-4743-a741-627001eb74de] Shutting down 1 stream threads
o.a.k.s.processor.internals.StreamTask   : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] task [0_0] Suspended from RUNNING
o.a.k.s.p.internals.RecordCollectorImpl  : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] stream-task [0_0] Closing record collector dirty
o.a.k.s.processor.internals.StreamTask   : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] task [0_0] Closed dirty
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
java spring-boot apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

正如上面评论中提到的,

sendToDlq
是一个
deserialization-exception-handler
,仅由实际的反序列化错误触发。代码中的内容是业务逻辑执行期间的运行时错误。为此,您需要使用一些自定义替代方案,例如活页夹中开箱即用的
DltAwareProcessor
。有关更多详细信息,请参阅这些docs。另外,这里有一个演示此功能的示例应用程序

© www.soinside.com 2019 - 2024. All rights reserved.