我们有一个运行时间很长的Spark结构化流查询,该查询正在从Kafka中读取,我们希望此查询在重新启动后从中断处恢复。但是,我们将startingOffsets
设置为“ earliest
”,并且重启后看到的是查询再次从Kafka主题的开头读取。
我们的基本查询如下:
val extract = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server:port")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.load()
val query: StreamingQuery = extract
.writeStream
.option("checkpointLocation", s"/tmp/checkpoint/kafka/")
.foreach(writer)
.start()
我们看到正确创建了检查点目录,并在偏移文件中具有我们期望的偏移。
当我们重新启动时,我们会看到类似这样的消息:
25-07-2017 14:35:32 INFO ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver
我们告诉查询从“ earliest
”开始,但是文档说:
这是否意味着重启我们的应用程序会导致查询从中断处恢复?
Spark Structured Streaming不允许为Kafka设置“ group.id
”。看到此:Note that the following Kafka params cannot be set and the Kafka source will throw an exception.
[我尝试添加queryName
,以防该字符用于跨运行识别查询,但没有任何效果。
我们正在YARN上使用Spark 2.1。
关于为什么这行不通或我们做错了的任何想法?
更新日志:
首先,为什么要说再次创建检查点目录。您是在初次运行后将其删除然后恢复吗?
因此,为清楚起见,当您首次启动查询时,将从头开始读取“ .option(“ startingOffsets”,“最早”)“设置。并考虑到出现了问题,并且流停止了。您对其进行了修复,然后再次启动流(不删除检查点目录),该流应从先前已停止的偏移处开始。
如果您已经删除了检查点目录,然后又恢复了流,则显然不会读取任何偏移量的历史记录(因为您删除了检查点),因此将从第一个(最早的)偏移量开始在kafka上可用。