优雅地停止结构化流查询

问题描述 投票:0回答:5

我正在使用 Spark 2.1 并尝试优雅地停止流查询。

StreamingQuery.stop()
一个优雅的停止,因为我没有在文档中看到有关此方法的任何详细信息:

void stop()
如果该查询正在运行,则停止执行。 此方法会阻塞,直到正在执行的线程停止为止。 自:2.0.0

在过去的流世界(DStreams)中,有一个选项可以停止流的执行,并可以选择确保所有接收到的数据已被处理:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
停止流的执行,并可以选择确保所有接收到的数据已被处理。

stopSparkContext
如果为 true,则停止关联的 SparkContext。无论是否执行此操作,底层 SparkContext 都会停止 StreamingContext 已启动。

stopGracefully
如果为 true,则通过等待所有接收到的数据的处理完成来优雅地停止

那么问题是如何优雅地停止结构化流查询?

apache-spark spark-structured-streaming
5个回答
7
投票

对于 PySpark 用户,这是 @ASe 答案的 Python 端口

# Helper method to stop a streaming query
def stop_stream_query(query, wait_time):
    """Stop a running streaming query"""
    while query.isActive:
        msg = query.status['message']
        data_avail = query.status['isDataAvailable']
        trigger_active = query.status['isTriggerActive']
        if not data_avail and not trigger_active and msg != "Initializing sources":
            print('Stopping query...')
            query.stop()
        time.sleep(0.5)

    # Okay wait for the stop to happen
    print('Awaiting termination...')
    query.awaitTermination(wait_time)

7
投票

如果没有更多记录可供消费,这样的代码可以帮助停止微批次流

def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
   while (query.isActive) {
      val msg = query.status.message
      if (!query.status.isDataAvailable
          && !query.status.isTriggerActive
             && !msg.equals("Initializing sources")) {
      query.stop()
    }
    query.awaitTermination(awaitTerminationTimeMs)
  }
}

6
投票

如果“优雅地”意味着流式查询应该完成数据处理,那么

void stop()
将不会这样做。它只会等待,直到执行执行的线程停止(如文档中所述)。这并不意味着它将完成处理。

为此,我们需要让查询等待,直到查询的当前触发器完成。我们可以通过

StreamingQueryStatus
检查,如下所示:

while (query.status.isTriggerActive) {//什么都不做}

它将等待查询完成处理。然后我们就可以调用

query.stop()

希望对你有帮助!


3
投票

StreamingQuery.stop
不会优雅地停止查询,它会调用
sparkContext.cancelJobGroup(all jobs generated by streaming query)

因此,为了避免这种情况,并等待当前批次完成,我使用 https://gist.github.com/GrigorievNick/bf920e32f70cb1cf8308cd601e415d12 请注意,它仅适用于 MicroBatchExecution


1
投票

这取决于“优雅地”是什么意思:)

StreamingQuery 仅停止特定查询。它会等待,直到 MicroBatch 线程停止并准备好关闭源。这个“等待”意味着数据将被处理,然后线程将停止

© www.soinside.com 2019 - 2024. All rights reserved.