我正在处理火花流,并且当新流文件每10分钟发出一次时不想处理旧文件:
val val1= spark
.read //
.option("header", "true")
.option("schema", "true")
.option("sep", ",")
.csv(path_to_file).toDF().cache()
val1.registerTempTable("test")
在创建数据帧后,我做了一些转换和处理检查点可以帮助我以及我在我的情况下如何使用
*****************解决方案*******************
val spark = SparkSession .builder .appName(“test”)。config(“spark.local”,“local [*]”)。getOrCreate()spark.sparkContext.setCheckpointDir(path_checkpoint)并在我调用checkpoint函数之后dataframe我指定了一个执行作业的触发器
.writeStream
.format("csv")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.option("checkpointLocation",CheckPoint)
.trigger(Trigger.ProcessingTime("180 seconds"))
.option("Path",Path )
.option("header", true)
.outputMode("Append")
.queryName("test")
.start()