下面的代码在数据处理后工作并提交偏移量。但问题是,在以下情况下处理重复:
消费者作业正在运行,hive表有0条记录,当前偏移量为(FORMAT- fromOffest,untilOffset,Difference):512 512 0
然后我制作了1000条记录,当它读取34条记录但未提交时,我将其杀死512 546 34
我看到,到目前为止,34个recs已经被加载到Hive表中
接下来,我重新启动了应用程序。
我看到它再次读取了34条记录(而不是读取1000-34 = 76个记录),虽然它已经处理过它们并加载到Hive 512 1512 1000然后几秒后它就会更新。 1512 1512 0 Hive现在有(34 + 1000 = 1034)
这会导致表中的重复记录(额外34)。如代码中所述,我只在处理/加载到Hive表后提交偏移量。
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception {
return tuple2.value();
}
});
records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty()) {
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
}
}
});
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
for (OffsetRange offset : offsetRanges) {
System.out.println(offset.fromOffset() + " " + offset.untilOffset()+ " "+offset.count());
}
}
});
javaStreamContext.start();
javaStreamContext.awaitTermination();
}
一般来说,在构建Spark Streaming作业时,您不应该关注应该在下游处理的重复项。不要误解我的意思,你想构建你的应用程序以防止重复,但是当灾难性的事情发生时你会得到重复,这就是为什么最好在以后管理它。
我看到的第一个问题是你要保存你的抵消。您应该在保存数据后立即保存它们,而不是在之后的方法中保存它们。当records.foreachRDD完成methodToSaveData时,它应该调用以保存偏移量。您可能需要重新构建映射记录的方式,以便获得偏移详细信息,但这是最佳位置。
records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty()) {
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
**{commit offsets here}**
}
}
});
也就是说,保存偏移的位置并不重要。如果作业在将数据写入配置单元并且在提交偏移范围之前被终止,则您将重新处理记录。有一些方法可以构建应用程序,因此它具有优雅的关闭钩子(Google it),它试图捕获kill命令并优雅地关闭它,但同样又容易受到应用程序被杀或崩溃的影响。如果运行执行程序的机器在保存到配置单元之后但在提交偏移量之前断电,则表示您已经重复了。如果应用程序被杀死-9(在Linux中),它不关心正常关闭,你将有重复。