我使用spark-streaming来读取kafka数据,并处理每一行
我用下面的内容来创建一个流媒体:
lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics,kafkaParams)
);
然后我使用此代码处理来自kafka的数据
lines.foreachRDD((JavaRDD<ConsumerRecord<String, String>> rdd) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
OffsetRange[] range = new OffsetRange[1];
range[0] = o;
rdd.foreachPartition((Iterator<ConsumerRecord<String, String>> partitionOfRecords) -> {
// get kafka offset
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
// to cache line data
List<String> jsonData = new ArrayList<>();
// to read all line data
while (partitionOfRecords.hasNext()) {
ConsumerRecord<String, String> line = partitionOfRecords.next();
jsonData.add(line.value());
}
// TODO do my own bussiness from jsonData
.......
// HOW can I commit kafka Offset Here??
// this is a method to commit offset
((CanCommitOffsets) lines.inputDStream()).commitAsync(range)
});
});
我尝试了很多次,发现它有一些问题:
当其他分区失败时,如果我的数据处理成功,它是如何工作的?这意味着我的所有数据流程应该回来了吗?因为kafka偏移已经提交
如果某个特定任务失败,Spark将尝试根据spark.task.maxFailures
设置重新执行它。如果数字已过,则整个作业将失败。您需要确保如果commitAsync
之前的部分失败,则不会提交偏移量。
我已经运行了这段代码,然后我发现它真的执行commit操作是在下次执行rdd执行器的时候,这意味着如果进度或被杀死,下次我从Kafka读取的一些数据会加倍?
是。如果作业在下一次批处理迭代之前被终止,Spark将尝试重新读取已经处理的数据。