处理并忽略 Kafka Streams 中的 UNKNOWN_TOPIC_OR_PARTITION 错误

问题描述 投票:0回答:1

我正在使用 Kafka Streams 应用程序,我们使用基于消息头的动态主题确定。在我们的设置中,在应用程序运行时删除主题是正常的。已删除主题的消息偶尔可能仍会到达,但我想忽略它们。然而,即使只收到一条不存在主题的消息,我也会遇到无限循环的错误:

[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 74 : {test1=UNKNOWN_TOPIC_OR_PARTITION}

org.apache.kafka.common.errors.TimeoutException: Topic test1 not present in metadata after 60000 ms.

[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 79 : {test1=UNKNOWN_TOPIC_OR_PARTITION}

这种错误的无限循环本质上会导致应用程序停止工作。如何配置我的 Kafka Streams 应用程序以忽略已删除主题的消息而不进入无限循环的错误?有办法处理这种情况吗? 这是我的应用程序代码的简化示例:

StreamsBuilder builder = new StreamsBuilder();
List<String> dynamicTopics = List.of("good_topic", "deleted_topic");
builder.stream("source_topic").to((k, v, c) -> dynamicTopics.get(new Random().nextInt(dynamicTopics.size()))); //in real application from header
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Automatic topic creation is disabled. 

我尝试了以下方法来处理并忽略该错误:

  1. 使用 KafkaAdmin:但是,在检查现有主题之间,可以删除主题,但这并不能解决问题。

  2. 设置UncaughtExceptionHandler:

streams.setUncaughtExceptionHandler(new StreamsUncaughtExceptionHandler() {
    @Override
    public StreamThreadExceptionResponse handle(Throwable throwable) {
        return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
    }
});

但是代码甚至没有到达这个处理程序。

  1. 设置 ProductionExceptionHandler:
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
          CustomProductionExceptionHandler.class.getName());

同样,代码未到达此处理程序。

  1. 设置生产者拦截器:
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), ErrorInterceptor.class.getName());

代码到达此拦截器,但我无法从这里解决问题。

  1. 配置生产者属性:
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, "5000"); 
props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), "8000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "0");
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 0);

我尝试调整这些生产者属性,但 Kafka Streams 仍然尝试无限期地处理错误

java apache-kafka apache-kafka-streams
1个回答
0
投票

目前还没有办法做你想做的事。我确实和一位同事深入研究了代码(感谢安德鲁!),生产者在这种情况下返回一个

TimeoutException
,什么是
RetriableException
,因此 KafkaStreams 不会调用生产异常处理程序(这是你可以吞下的唯一地方)错误)但是,好吧,重试。对于一般情况,这种行为是有意义的(KafkaStreams 尝试在内部处理尽可能多的错误),但对于您的场景,您手头有一个有点“奇怪”的极端情况,并且模式会中断。

生产者在这里返回一个可重试的异常,这是一个有点奇怪的极端情况;丢失的元数据在大多数情况下是可以重试的,所以它并不是完全错误的,但是对于不存在的主题,它并不总是正确的(问题是真的,生产者无法区分这两种情况......)

您能否提交一份有关此问题的 Jira 票证,以便我们可以改善这种情况。您的用例有些不标准,因此过去没有人考虑过它。

© www.soinside.com 2019 - 2024. All rights reserved.