Spark Kafka流式传输0.8间接流KafkaUtils.createStream updateWaterMark是否将偏移量保存到Zookeeper?

问题描述 投票:0回答:1

我一定会用

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

使用已弃用的功能

val kafkaStream = KafkaUtils.createStream(streamingContext, zkArgs, consumerGroupId, topicMap)

kafkaStream.foreachRDD(rdd => {

  val sqlContext = new SQLContext(sc)

我读到手动使用水印这样做:

//      enabling watermarking upon success
val sparkConf = new SparkConf()
  ....
  .set("zookeeper.hosts", zkArgs)
  .set("enable.auto.commit", "false")
  ....

df.withWatermark("eventTime", "10 minutes")
  .write .....

课程跟踪后,我上了EventTimeWatermark等课程...

在另一个地方,我读到我应该自己写一些补偿:

def saveOffsets(zkClient:  ZkClient, zkPath: String, rdd: RDD[_]): Unit = {
  val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
  .mkString(",")

  ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
}

是吗?

df.withWatermark("eventTime", "10 minutes")
      .write

.....最终更新Zookeeper中的水印?或者在集群上运行spark的另一种机制?

apache-spark apache-kafka spark-streaming apache-zookeeper
1个回答
1
投票

由于水印仅在Spark流中完成,因此从Kafka中挑选的延迟消息在Spark中被忽略。

读取消息时更新Kafka偏移量。

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

© www.soinside.com 2019 - 2024. All rights reserved.