如何正常停止笔记本流作业?

问题描述 投票:0回答:1

我有一个流应用程序,正在运行到Databricks笔记本作业(https://docs.databricks.com/jobs.html)中。我希望能够使用stop()方法返回的StreamingQuery类的stream.start()方法来优雅地停止流作业。当然,这需要访问提到的流实例或访问正在运行的作业本身的上下文。在第二种情况下,代码如下所示:

spark.sqlContext.streams.get("some_streaming_uuid").stop()

以上代码应从其他笔记本作业中执行,尽管我无法找到一种方法来访问作业上下文并执行上述scala代码,但应将其称为stop_streaming_job。有什么办法可以用databricks笔记本来实现这一目标?

scala apache-spark spark-streaming databricks spark-notebook
1个回答
0
投票

这里是解决此问题的一种方法。我们使用databricks文件系统dbfs来停止流作业。第一部分request_stream_job_stop.scala是一个笔记本,它在预定义目录中创建并保存一个以流唯一ID命名的文件。第二部分监视相同的目录,并等待提到的文件出现。当文件出现时,它会执行streamQuery.stop(),它将正常停止流作业,并最终删除该文件。

这是将创建停止请求的第一部分:

// Databricks notebook source
// MAGIC %md
// MAGIC # Stop streaming job
// MAGIC This notebook will send a request for stopping a streaming job

// COMMAND ----------
dbutils.widgets.text("streamStopDir", "dbfs:/tmp/eneco.streaming.toon.eu/stop-streaming-job/")
dbutils.widgets.text("jobId", "")
dbutils.widgets.text("timeout", "10")

val streamStopDir = dbutils.widgets.get("streamStopDir")
val jobId =  dbutils.widgets.get("jobId")

val path = s"${streamStopDir}/${jobId}"
val timeout = dbutils.widgets.get("timeout").toInt

val pathExists = (path: String) => {
  try{
    dbutils.fs.ls(path).size > 0
  }
  catch{
    case _ : java.io.FileNotFoundException => false
  }
}

if(!pathExists(path)){
  println(s"Storing ${jobId} to the dir: ${streamStopDir}")
  dbutils.fs.put(path, java.time.LocalDateTime.now().toString)
}
else{
  throw new Exception(s"Stop file already exists for job:${jobId}")
}


var delay = 0
while( delay < timeout && pathExists(path) == true){
  Thread.sleep(1000L)
  delay += 1
}

if(delay != timeout && !pathExists(path))
  dbutils.notebook.exit("success")
else
  dbutils.notebook.exit("failed")

// COMMAND ----------

request_stream_job_stop笔记本需要以下参数:

  • streamStopDir:停止文件的存储目录
  • jobId:流作业的UUID
  • timeout:等待成功或不执行的时间,以秒为单位

执行逻辑:在jobId目录中创建并保存一个名为streamStopDir的新文件。等待直到流作业将新创建的文件消耗/删除或我们达到timeout阈值。终止并分别返回successfailed

另一方面,我们有流媒体工作。该程序包含一个Scala Future,它将等待直到出现停止文件:


val streamingQuery = spark
      .readStream
      .option("header", value = false)
      .option("delimiter", value = ";")
      .csv(s"s3://test_url/all/")
      .writeStream
      .trigger(Trigger.ProcessingTime(1L))
      .outputMode("append")
      .partitionBy("utc_date")
      .option("path", s"s3://test_url/temp_autodelete/test_data")
      .option("checkpointLocation", s"s3://test_url/spark-checkpoint/")
      .format("parquet")
      .start()

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Success, Failure}

var pathExists = (path: String) => {
  try{
    dbutils.fs.ls(path).size > 0
  }
  catch{
    case _ : java.io.FileNotFoundException => false
  }
} : Boolean

val f: Future[Boolean] = Future {
  val stopperPath = s"${streamStopDir}/${streamingQuery.id.toString}"

  while (streamQuery.isActive && !pathExists(stopperPath)){
    val random: ThreadLocalRandom = ThreadLocalRandom.current()
    val r = random.nextLong(10, 100 + 1) // returns value between 10 and 100
    Thread.sleep(r)
  }

  if(pathExists(stopperPath)){
    streamingQuery.stop()
    dbutils.fs.rm(stopperPath)

    true
  }
  else
    false
}

var output = "success"
f onComplete {
  case Success(result : Boolean) => if (!result) {
    output = s"failure: file not found."
  }
  case Failure(t) => output = s"failure: ${t.getMessage}."
}

streamingQuery.awaitTermination()

执行逻辑:创建一个异步等待直到满足以下条件之一的未来:

  • 工作活跃
  • 在监视目录中找到停止文件

如果满足前述条件之一,则调用streamingQuery.stop()并删除停止文件。

PS:有人可以使用任何文件系统(例如S3,HDFS等),甚至可以使用基于队列的解决方案(例如Kafka,SQS,ZeroMQ]] >>

© www.soinside.com 2019 - 2024. All rights reserved.