我有简单的spring cloud kafka流应用程序。每当出现异常时,应用程序都会终止,而我无法覆盖此行为。当存在某些类型的异常时,或者在其他类型的异常上继续时,期望的结果是增加退避。我使用springCloudVersion - Hoxton.SR3
和spring boot: 2.2.6.RELEASE
application.yaml
spring:
cloud:
stream:
binders.process-in-0:
destination: test
kafka:
streams:
binder:
deserializationExceptionHandler: logAndContinue
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
豆
@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
return input -> input.process(() -> new EventProcessor());
}
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
ContinueOnErrorHandler.class);
};
}
EventProcessor
public class EventProcessor implements Processor<String, String>, ProcessorSupplier<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
throw new RuntimeException("Some exception");
}
@Override
public void close() {
}
@Override
public Processor<String, String> get() {
return this;
}
}
ContinueOnErrorHandler
public class ContinueOnErrorHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> configs) {
//ignore
}
}
您正在使用的自定义处理器正在使用RuntimeException
方法抛出process
。它没有被任何东西捕获。抛出该异常时,应用程序将直接退出。
您正在使用的生产异常处理程序在这里没有任何作用,因为您在这里没有生产任何东西。 Consumer
不产生任何东西。如果您有使用某些东西的用例,则应改用java.util.funciton.Function
。
为了在此处解决此问题,当您在自定义处理器(EventProcessor
)中处理记录时,如果遇到异常,则应捕获该异常并采取适当的措施。例如,这是一个模板:
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
try {
// start processing
// exception thrown
}
catch (Exception e){
// Take the appropriate action
}
}
这样,当处理器中引发异常时,应用程序将不会终止。