Spring Cloud Streams 错误处理不起作用

问题描述 投票:0回答:1

Spring Cloud Streams 中的不同错误处理方法的示例非常少,并且通过文档部分提供的少数示例似乎也不起作用。

我有一个测试存储库,尝试了多种错误捕获方法,但这些方法都不起作用。

Spring Cloud Streams 具有可靠的反序列化和序列化错误处理,但来自映射、转换和处理器方法的错误处理的记录非常少。

示例存储库:https://github.com/StevenPG/scs-experimentation/tree/main/scs4-error-handling/error-handling

我只有两个主要文件

@SpringBootApplication
public class ErrorHandlingApplication {

    public final Random randomNumberGenerator = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        SpringApplication.run(ErrorHandlingApplication.class, args);
    }

    @Bean
    public Supplier<Message<String>> randomIntegerPublisher() {
        return () -> MessageBuilder
                .withPayload(String.valueOf(randomNumberGenerator.nextInt()))
                .setHeader(KafkaHeaders.RECEIVED_KEY, 0)
                .build();
    }

    @Bean
    public Consumer<KStream<String, String>> errorStream() {
        return input -> input
                // Remove odd numbers so we throw an exception on every other message
                .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
                .filter((key, value) -> (value & 1) == 0)
                .map((key, value) -> {
                    throw new RuntimeException("Pushing uncaught error to kill stream!");
                }
        );
    }

    @Bean
    public Consumer<KStream<String, String>> errorHandledStream() {
        return input -> input
                // Remove odd numbers so we throw an exception on ever other message
                .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
                .filter((key, value) -> (value & 1) == 0)
                .map((key, value) -> {
                            System.out.println("This should not kill the stream");
                            throw new RuntimeException("Publishing error to be caught!");
                        }
                );
    }

    @Bean
    // TODO - doesn't seem to be working, is this because we're using kstreams?
    public Consumer<ErrorMessage> defaultErrorHandler() {
        return v -> {
            System.out.println("Caught and handling error");
            System.out.println(v.toString());
        };
    }

    @Bean
    // TODO - not working via the config
    /**
     * bindings:
     *   errorHandledStream-in-0:
     *     consumer:
     *       commonErrorHandlerBeanName: defaultCommonErrorHandler
     */
    public CommonErrorHandler defaultCommonErrorHandler() {
        return new CommonLoggingErrorHandler();
    }

    /**
     * Also not working
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(defaultCommonErrorHandler());
        return factory;
    }
}

spring:
  cloud:
    function:
      definition: randomIntegerPublisher;errorStream;errorHandledStream;defaultErrorHandler
    stream:
      default:
        error-handler-definition: defaultErrorHandler
      kafka:
        streams:
          binder:
            deserialization-exception-handler: logandcontinue
          bindings:
            errorHandledStream-in-0:
              error-handler-definition: defaultErrorHandler
              consumer:
                commonErrorHandlerBeanName: defaultCommonErrorHandler
        bindings:
          errorHandledStream-in-0:
            consumer:
              commonErrorHandlerBeanName: defaultCommonErrorHandler
      bindings:
        randomIntegerPublisher-out-0:
          destination: integer-topic
        errorStream-in-0:
          destination: integer-topic
        errorHandledStream-in-0:
          destination: integer-topic
          error-handler-definition: defaultErrorHandler

几乎所有已记录的错误处理变体似乎都无法正常运行。

我的第一个流,errorStream 按预期运行。杀死相关的消费者(尽管全局配置应该捕获这一点)。

第二个流,errorHandledStream 尝试提供捕获错误的配置。

主要的要求是,当映射方法中发生异常时(对于本示例),能够让某些异常处理程序执行操作,以便流不会崩溃并重新启动。

这都是最新的 spring-cloud-streams 版本以及以下依赖项。

extra["springCloudVersion"] = "2022.0.3"

    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")

使用了以下参考文献:

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling

我在这里缺少什么,和/或我可以使用什么参考来审查和实施。或者,是否有一个工作示例发布在任何地方(或可以在此处提供)作为起点?

spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka kafka-streams-binder
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.