Spring cloud kafka流应用程序异常终止

问题描述 投票:0回答:1

我有简单的spring cloud kafka流应用程序。每当出现异常时,应用程序都会终止,而我无法覆盖此行为。当存在某些类型的异常时,或者在其他类型的异常上继续时,期望的结果是增加退避。我使用springCloudVersion - Hoxton.SR3spring boot: 2.2.6.RELEASE

application.yaml

spring:
 cloud:
   stream:
     binders.process-in-0:
         destination: test

     kafka:
       streams:
         binder:
           deserializationExceptionHandler: logAndContinue
           configuration:
             default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
             default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
    return input -> input.process(() -> new EventProcessor());
}

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                ContinueOnErrorHandler.class);
    };
}

EventProcessor

public class EventProcessor implements Processor<String, String>, ProcessorSupplier<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {

        throw new RuntimeException("Some exception");
    }

    @Override
    public void close() {

    }

    @Override
    public Processor<String, String> get() {
        return this;
    }
}

ContinueOnErrorHandler

public class ContinueOnErrorHandler implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
        return ProductionExceptionHandlerResponse.CONTINUE;
}

    @Override
    public void configure(Map<String, ?> configs) {
        //ignore
    }
}
spring-boot apache-kafka spring-cloud-stream
1个回答
0
投票

您正在使用的自定义处理器正在使用RuntimeException方法抛出process。它没有被任何东西捕获。抛出该异常时,应用程序将直接退出。

您正在使用的生产异常处理程序在这里没有任何作用,因为您在这里没有生产任何东西。 Consumer不产生任何东西。如果您有使用某些东西的用例,则应改用java.util.funciton.Function

为了在此处解决此问题,当您在自定义处理器(EventProcessor)中处理记录时,如果遇到异常,则应捕获该异常并采取适当的措施。例如,这是一个模板:

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }

        @Override
        public void process(String key, String value) {
            try {
                // start processing
                // exception thrown

            }
            catch (Exception e){
                // Take the appropriate action
            }
        }

这样,当处理器中引发异常时,应用程序将不会终止。

© www.soinside.com 2019 - 2024. All rights reserved.