Spring Cloud Streams 中的不同错误处理方法的示例非常少,并且通过文档部分提供的少数示例似乎也不起作用。
我有一个测试存储库,尝试了多种错误捕获方法,但这些方法都不起作用。
Spring Cloud Streams 具有可靠的反序列化和序列化错误处理,但来自映射、转换和处理器方法的错误处理的记录非常少。
示例存储库:https://github.com/StevenPG/scs-experimentation/tree/main/scs4-error-handling/error-handling
我只有两个主要文件
@SpringBootApplication
public class ErrorHandlingApplication {
public final Random randomNumberGenerator = new Random(System.currentTimeMillis());
public static void main(String[] args) {
SpringApplication.run(ErrorHandlingApplication.class, args);
}
@Bean
public Supplier<Message<String>> randomIntegerPublisher() {
return () -> MessageBuilder
.withPayload(String.valueOf(randomNumberGenerator.nextInt()))
.setHeader(KafkaHeaders.RECEIVED_KEY, 0)
.build();
}
@Bean
public Consumer<KStream<String, String>> errorStream() {
return input -> input
// Remove odd numbers so we throw an exception on every other message
.map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
.filter((key, value) -> (value & 1) == 0)
.map((key, value) -> {
throw new RuntimeException("Pushing uncaught error to kill stream!");
}
);
}
@Bean
public Consumer<KStream<String, String>> errorHandledStream() {
return input -> input
// Remove odd numbers so we throw an exception on ever other message
.map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
.filter((key, value) -> (value & 1) == 0)
.map((key, value) -> {
System.out.println("This should not kill the stream");
throw new RuntimeException("Publishing error to be caught!");
}
);
}
@Bean
// TODO - doesn't seem to be working, is this because we're using kstreams?
public Consumer<ErrorMessage> defaultErrorHandler() {
return v -> {
System.out.println("Caught and handling error");
System.out.println(v.toString());
};
}
@Bean
// TODO - not working via the config
/**
* bindings:
* errorHandledStream-in-0:
* consumer:
* commonErrorHandlerBeanName: defaultCommonErrorHandler
*/
public CommonErrorHandler defaultCommonErrorHandler() {
return new CommonLoggingErrorHandler();
}
/**
* Also not working
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(defaultCommonErrorHandler());
return factory;
}
}
和
spring:
cloud:
function:
definition: randomIntegerPublisher;errorStream;errorHandledStream;defaultErrorHandler
stream:
default:
error-handler-definition: defaultErrorHandler
kafka:
streams:
binder:
deserialization-exception-handler: logandcontinue
bindings:
errorHandledStream-in-0:
error-handler-definition: defaultErrorHandler
consumer:
commonErrorHandlerBeanName: defaultCommonErrorHandler
bindings:
errorHandledStream-in-0:
consumer:
commonErrorHandlerBeanName: defaultCommonErrorHandler
bindings:
randomIntegerPublisher-out-0:
destination: integer-topic
errorStream-in-0:
destination: integer-topic
errorHandledStream-in-0:
destination: integer-topic
error-handler-definition: defaultErrorHandler
几乎所有已记录的错误处理变体似乎都无法正常运行。
我的第一个流,errorStream 按预期运行。杀死相关的消费者(尽管全局配置应该捕获这一点)。
第二个流,errorHandledStream 尝试提供捕获错误的配置。
主要的要求是,当映射方法中发生异常时(对于本示例),能够让某些异常处理程序执行操作,以便流不会崩溃并重新启动。
这都是最新的 spring-cloud-streams 版本以及以下依赖项。
extra["springCloudVersion"] = "2022.0.3"
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
使用了以下参考文献:
我在这里缺少什么,和/或我可以使用什么参考来审查和实施。或者,是否有一个工作示例发布在任何地方(或可以在此处提供)作为起点?
此处请求的功能将作为 https://github.com/spring-cloud/spring-cloud-stream/issues/2779 的一部分添加。