当使用spark-streaming时,如何通过自己保存多个分区的Kafka偏移量

问题描述 投票:3回答:1

我使用spark-streaming来读取kafka数据,并处理每一行

我用下面的内容来创建一个流媒体:

lines = KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics,kafkaParams)
    );

然后我使用此代码处理来自kafka的数据

    lines.foreachRDD((JavaRDD<ConsumerRecord<String, String>> rdd) -> {
          OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
          OffsetRange[] range = new OffsetRange[1];
          range[0] = o;

          rdd.foreachPartition((Iterator<ConsumerRecord<String, String>> partitionOfRecords) -> {
          // get kafka offset
          OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
          // to cache line data
          List<String> jsonData = new ArrayList<>();
          // to read all line data
          while (partitionOfRecords.hasNext()) {
                ConsumerRecord<String, String> line = partitionOfRecords.next();
                jsonData.add(line.value());
          }
          // TODO  do my own bussiness from jsonData
          .......
          //  HOW can I commit kafka Offset Here??
          // this is a method to commit offset 
          ((CanCommitOffsets) lines.inputDStream()).commitAsync(range) 
      });
    });

我尝试了很多次,发现它有一些问题:

  1. 当其他分区失败时,如果我的数据处理成功,它是如何工作的?这意味着我的所有数据流程应该回来了吗?因为kafka偏移已经提交;
  2. 我已经运行了这段代码,然后我发现它真的执行commit操作是在下次执行rdd执行器的时候,这意味着如果进度或被杀死,下次我从Kafka读取的一些数据会加倍?
apache-kafka spark-streaming
1个回答
0
投票

当其他分区失败时,如果我的数据处理成功,它是如何工作的?这意味着我的所有数据流程应该回来了吗?因为kafka偏移已经提交

如果某个特定任务失败,Spark将尝试根据spark.task.maxFailures设置重新执行它。如果数字已过,则整个作业将失败。您需要确保如果commitAsync之前的部分失败,则不会提交偏移量。

我已经运行了这段代码,然后我发现它真的执行commit操作是在下次执行rdd执行器的时候,这意味着如果进度或被杀死,下次我从Kafka读取的一些数据会加倍?

是。如果作业在下一次批处理迭代之前被终止,Spark将尝试重新读取已经处理的数据。

© www.soinside.com 2019 - 2024. All rights reserved.