我一定会用
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
使用已弃用的功能
val kafkaStream = KafkaUtils.createStream(streamingContext, zkArgs, consumerGroupId, topicMap)
kafkaStream.foreachRDD(rdd => {
val sqlContext = new SQLContext(sc)
我读到手动使用水印这样做:
// enabling watermarking upon success
val sparkConf = new SparkConf()
....
.set("zookeeper.hosts", zkArgs)
.set("enable.auto.commit", "false")
....
df.withWatermark("eventTime", "10 minutes")
.write .....
课程跟踪后,我上了EventTimeWatermark等课程...
在另一个地方,我读到我应该自己写一些补偿:
def saveOffsets(zkClient: ZkClient, zkPath: String, rdd: RDD[_]): Unit = {
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
.mkString(",")
ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
}
是吗?
df.withWatermark("eventTime", "10 minutes")
.write
.....最终更新Zookeeper中的水印?或者在集群上运行spark的另一种机制?
由于水印仅在Spark流中完成,因此从Kafka中挑选的延迟消息在Spark中被忽略。
读取消息时更新Kafka偏移量。