许多流媒体源的检查点

问题描述 投票:0回答:1

我正在使用zeppelin,我从火花流中读取了许多来源的文件,如下所示:

    val var1 = spark
  .readStream
  .schema(var1_raw)      
  .option("sep", ",")  
  .option("mode", "PERMISSIVE")  
  .option("maxFilesPerTrigger", 100)
  .option("treatEmptyValuesAsNulls", "true")
  .option("newFilesOnly", "true") 
  .csv(path_var1 )    


val chekpoint_var1 =  var1
    .writeStream
    .format("csv") 
    .option("checkpointLocation", path_checkpoint_var1) 
    .option("Path",path_checkpoint )  
    .option("header", true)  
    .outputMode("Append")
    .queryName("var1_backup")
    .start().awaitTermination()


val var2 = spark
    .readStream
  .schema(var2_raw)      
  .option("sep", ",")  
  .option("mode", "PERMISSIVE")  //
  .option("maxFilesPerTrigger", 100)
  .option("treatEmptyValuesAsNulls", "true") 
  .option("newFilesOnly", "true") 
  .csv(path_var2 )   

val chekpoint_var2 =  var2
    .writeStream
    .format("csv") 
    .option("checkpointLocation", path_checkpoint_var2)   //
    .option("path",path_checkpoint_2 )  
    .option("header", true)  
    .outputMode("Append")
    .queryName("var2_backup")
    .start().awaitTermination()

当我重新运行该作业时,我收到此消息:java.lang.IllegalArgumentException:无法以名称var1_backup启动查询,因为具有该名称的查询已处于活动状态

*****************解决方案*******************

val spark = SparkSession
    .builder
    .appName("test")
    .config("spark.local", "local[*]")
    .getOrCreate()
spark.sparkContext.setCheckpointDir(path_checkpoint)

在我调用数据帧上的检查点功能之后

scala apache-spark apache-spark-sql apache-zeppelin spark-structured-streaming
1个回答
0
投票

*****************解决方案*******************

val spark = SparkSession
    .builder
    .appName("test")
    .config("spark.local", "local[*]")
    .getOrCreate()
spark.sparkContext.setCheckpointDir(path_checkpoint)

在我调用数据帧上的检查点功能之后

© www.soinside.com 2019 - 2024. All rights reserved.