我有一个流应用程序,正在运行到Databricks笔记本作业(https://docs.databricks.com/jobs.html)中。我希望能够使用stop()
方法返回的StreamingQuery
类的stream.start()
方法来优雅地停止流作业。当然,这需要访问提到的流实例或访问正在运行的作业本身的上下文。在第二种情况下,代码如下所示:
spark.sqlContext.streams.get("some_streaming_uuid").stop()
以上代码应从其他笔记本作业中执行,尽管我无法找到一种方法来访问作业上下文并执行上述scala代码,但应将其称为stop_streaming_job
。有什么办法可以用databricks笔记本来实现这一目标?
这里是解决此问题的一种方法。我们使用databricks文件系统dbfs
来停止流作业。第一部分request_stream_job_stop.scala
是一个笔记本,它在预定义目录中创建并保存一个以流唯一ID命名的文件。第二部分监视相同的目录,并等待提到的文件出现。当文件出现时,它会执行streamQuery.stop()
,它将正常停止流作业,并最终删除该文件。
这是将创建停止请求的第一部分:
// Databricks notebook source
// MAGIC %md
// MAGIC # Stop streaming job
// MAGIC This notebook will send a request for stopping a streaming job
// COMMAND ----------
dbutils.widgets.text("streamStopDir", "dbfs:/tmp/eneco.streaming.toon.eu/stop-streaming-job/")
dbutils.widgets.text("jobId", "")
dbutils.widgets.text("timeout", "10")
val streamStopDir = dbutils.widgets.get("streamStopDir")
val jobId = dbutils.widgets.get("jobId")
val path = s"${streamStopDir}/${jobId}"
val timeout = dbutils.widgets.get("timeout").toInt
val pathExists = (path: String) => {
try{
dbutils.fs.ls(path).size > 0
}
catch{
case _ : java.io.FileNotFoundException => false
}
}
if(!pathExists(path)){
println(s"Storing ${jobId} to the dir: ${streamStopDir}")
dbutils.fs.put(path, java.time.LocalDateTime.now().toString)
}
else{
throw new Exception(s"Stop file already exists for job:${jobId}")
}
var delay = 0
while( delay < timeout && pathExists(path) == true){
Thread.sleep(1000L)
delay += 1
}
if(delay != timeout && !pathExists(path))
dbutils.notebook.exit("success")
else
dbutils.notebook.exit("failed")
// COMMAND ----------
request_stream_job_stop笔记本需要以下参数:
执行逻辑:在jobId
目录中创建并保存一个名为streamStopDir
的新文件。等待直到流作业将新创建的文件消耗/删除或我们达到timeout
阈值。终止并分别返回success
或failed
。
另一方面,我们有流媒体工作。该程序包含一个Scala Future,它将等待直到出现停止文件:
val streamingQuery = spark
.readStream
.option("header", value = false)
.option("delimiter", value = ";")
.csv(s"s3://test_url/all/")
.writeStream
.trigger(Trigger.ProcessingTime(1L))
.outputMode("append")
.partitionBy("utc_date")
.option("path", s"s3://test_url/temp_autodelete/test_data")
.option("checkpointLocation", s"s3://test_url/spark-checkpoint/")
.format("parquet")
.start()
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Success, Failure}
var pathExists = (path: String) => {
try{
dbutils.fs.ls(path).size > 0
}
catch{
case _ : java.io.FileNotFoundException => false
}
} : Boolean
val f: Future[Boolean] = Future {
val stopperPath = s"${streamStopDir}/${streamingQuery.id.toString}"
while (streamQuery.isActive && !pathExists(stopperPath)){
val random: ThreadLocalRandom = ThreadLocalRandom.current()
val r = random.nextLong(10, 100 + 1) // returns value between 10 and 100
Thread.sleep(r)
}
if(pathExists(stopperPath)){
streamingQuery.stop()
dbutils.fs.rm(stopperPath)
true
}
else
false
}
var output = "success"
f onComplete {
case Success(result : Boolean) => if (!result) {
output = s"failure: file not found."
}
case Failure(t) => output = s"failure: ${t.getMessage}."
}
streamingQuery.awaitTermination()
执行逻辑:创建一个异步等待直到满足以下条件之一的未来:
如果满足前述条件之一,则调用streamingQuery.stop()
并删除停止文件。
PS:有人可以使用任何文件系统(例如S3,HDFS等),甚至可以使用基于队列的解决方案(例如Kafka,SQS,ZeroMQ]] >>