Kafka流式传输-TimeoutException:即将到期的* TOPIC记录*自创建批处理以来已过去* ms

问题描述 投票:0回答:1

流应用程序在生产中推出,并且在10天后立即观察CustomProductionExceptionHandler中的错误/警告,该消息是属于较早日期窗口的过期事务。

FLOW:

输入主题->流应用程序(产生统计信息并在关闭日窗口后发出)->输出主题

生产者不断尝试将记录发布到OUTPUT主题,该主题已在较旧的窗口中到期,并将错误记录到CustomProductionExceptionHandler中。

我减小了批量大小并保留默认值,但此更改尚未推广到生产中。

CustomProductionExceptionHandler实现:为了避免流因NeworkException,TimeOutException而死。

[使用此实现,生产者不会重试,并且在发生任何异常的情况下它会继续..另一方面,返回FAIL ..流线程死亡并且不会自动重启。.需要建议。

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        String recordKey = new String(record.key());
        String recordVal = new String(record.value());
        String recordTopic = record.topic();
        logger.error("Kafka message marked as processed although it failed. Message: [{}:{}], destination topic: [{}]", recordKey,recordVal,recordTopic,exception);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

例外:

2019-12-20 16:31:37.576错误com.jpmc.gpg.exception.CustomProductionExceptionHandler.handle(CustomProductionExceptionHandler.java:19)kafka-producer-network-thread | profile-day-summary-generator-291e69b1-5a3d-4d49-8797-252c2ae05607-StreamThread-19-producerid-Kafka消息标记为已处理,尽管失败。消息:[{“ statistics”:{}],目标主题:[OUTPUT-TOPIC]

org.apache.kafka.common.errors.TimeoutException:即将到期*自批处理创建以来,TOPIC的记录为1086149毫秒

试图获得以下问题的答案。

1)为什么生产者试图将较早的交易发布到已经关闭了当天窗口的OUTPUT主题?

示例-生产者试图发送12/09天的窗口交易,但当前打开的窗口是12/20

2)如果没有CustomProductionExceptionHandler->,流线程可能已经死亡。ProductionExceptionHandlerResponse.CONTINUE。在NetworkException或TimeoutException的情况下,我们是否可以使用Producer进行重试?然后继续而不是流线程死掉?在中指定ProductionExceptionHandlerResponse.CONTINUE的问题CustomProductionExceptionHandler是-如果发生任何异常,则跳过记录发布以输出主题并继续下一条记录。 无弹性。

apache-kafka-streams kafka-producer-api
1个回答
0
投票

1)在不知道您的程序做什么的情况下,实际上不可能回答这个问题。请注意,一般而言,Kafka Streams在事件时间上工作并处理乱序数据。

2]您可以通过在传递到PropertiesKafkaStreams中指定相应的客户端配置,来配置Kafka Streams应用程序的所有内部使用的客户端(即,使用者,生产者,管理客户端和还原使用者)。如果为不同的客户端使用不同的配置,则可以为它们加上前缀,即producer.retries而不是retries。查看文档以获取更多详细信息:https://docs.confluent.io/current/streams/developer-guide/config-streams.html#ak-consumers-producer-and-admin-client-configuration-parameters

© www.soinside.com 2019 - 2024. All rights reserved.