流应用程序在生产中推出,并且在10天后立即观察CustomProductionExceptionHandler中的错误/警告,该消息是属于较早日期窗口的过期事务。
FLOW:
输入主题->流应用程序(产生统计信息并在关闭日窗口后发出)->输出主题
生产者不断尝试将记录发布到OUTPUT主题,该主题已在较旧的窗口中到期,并将错误记录到CustomProductionExceptionHandler中。
我减小了批量大小并保留默认值,但此更改尚未推广到生产中。
CustomProductionExceptionHandler实现:为了避免流因NeworkException,TimeOutException而死。
[使用此实现,生产者不会重试,并且在发生任何异常的情况下它会继续..另一方面,返回FAIL ..流线程死亡并且不会自动重启。.需要建议。
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
String recordKey = new String(record.key());
String recordVal = new String(record.value());
String recordTopic = record.topic();
logger.error("Kafka message marked as processed although it failed. Message: [{}:{}], destination topic: [{}]", recordKey,recordVal,recordTopic,exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
例外:
2019-12-20 16:31:37.576错误com.jpmc.gpg.exception.CustomProductionExceptionHandler.handle(CustomProductionExceptionHandler.java:19)kafka-producer-network-thread | profile-day-summary-generator-291e69b1-5a3d-4d49-8797-252c2ae05607-StreamThread-19-producerid-Kafka消息标记为已处理,尽管失败。消息:[{“ statistics”:{}],目标主题:[OUTPUT-TOPIC]
org.apache.kafka.common.errors.TimeoutException:即将到期*自批处理创建以来,TOPIC的记录为1086149毫秒
试图获得以下问题的答案。
1)为什么生产者试图将较早的交易发布到已经关闭了当天窗口的OUTPUT主题?
示例-生产者试图发送12/09天的窗口交易,但当前打开的窗口是12/20
2)如果没有CustomProductionExceptionHandler->,流线程可能已经死亡。ProductionExceptionHandlerResponse.CONTINUE。在NetworkException或TimeoutException的情况下,我们是否可以使用Producer进行重试?然后继续而不是流线程死掉?在中指定ProductionExceptionHandlerResponse.CONTINUE的问题CustomProductionExceptionHandler是-如果发生任何异常,则跳过记录发布以输出主题并继续下一条记录。 无弹性。
1)在不知道您的程序做什么的情况下,实际上不可能回答这个问题。请注意,一般而言,Kafka Streams在事件时间上工作并处理乱序数据。
2]您可以通过在传递到Properties
的KafkaStreams
中指定相应的客户端配置,来配置Kafka Streams应用程序的所有内部使用的客户端(即,使用者,生产者,管理客户端和还原使用者)。如果为不同的客户端使用不同的配置,则可以为它们加上前缀,即producer.retries
而不是retries
。查看文档以获取更多详细信息:https://docs.confluent.io/current/streams/developer-guide/config-streams.html#ak-consumers-producer-and-admin-client-configuration-parameters