Spark结构化流处理不会在Kafka偏移处重新启动

问题描述 投票:4回答:1

我们有一个运行时间很长的Spark结构化流查询,该查询正在从Kafka中读取,我们希望此查询在重新启动后从中断处恢复。但是,我们将startingOffsets设置为“ earliest”,并且重启后看到的是查询再次从Kafka主题的开头读取。

我们的基本查询如下:

  val extract = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "server:port")
    .option("subscribe", "topic")
    .option("startingOffsets", "earliest")
    .load()

  val query: StreamingQuery = extract 
    .writeStream
    .option("checkpointLocation", s"/tmp/checkpoint/kafka/")
    .foreach(writer)
    .start()

我们看到正确创建了检查点目录,并在偏移文件中具有我们期望的偏移。

当我们重新启动时,我们会看到类似这样的消息:

25-07-2017 14:35:32 INFO  ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver

我们告诉查询从“ earliest”开始,但是文档说:

This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.

这是否意味着重启我们的应用程序会导致查询从中断处恢复?

Spark Structured Streaming不允许为Kafka设置“ group.id”。看到此:Note that the following Kafka params cannot be set and the Kafka source will throw an exception.

[我尝试添加queryName,以防该字符用于跨运行识别查询,但没有任何效果。

我们正在YARN上使用Spark 2.1。

关于为什么这行不通或我们做错了的任何想法?

更新日志:

From the Driver

From the Worker

scala apache-spark spark-structured-streaming
1个回答
0
投票

首先,为什么要说再次创建检查点目录。您是在初次运行后将其删除然后恢复吗?

因此,为清楚起见,当您首次启动查询时,将从头开始读取“ .option(“ startingOffsets”,“最早”)“设置。并考虑到出现了问题,并且流停止了。您对其进行了修复,然后再次启动流(不删除检查点目录),该流应从先前已停止的偏移处开始。

如果您已经删除了检查点目录,然后又恢复了流,则显然不会读取任何偏移量的历史记录(因为您删除了检查点),因此将从第一个(最早的)偏移量开始在kafka上可用。

© www.soinside.com 2019 - 2024. All rights reserved.