使用scala检查csv文件流上的文件

问题描述 投票:-1回答:1

我正在处理火花流,并且当新流文件每10分钟发出一次时不想处理旧文件:

val val1= spark  
.read //  
.option("header", "true")    
.option("schema", "true")    
.option("sep", ",")    
.csv(path_to_file).toDF().cache()  
val1.registerTempTable("test")

在创建数据帧后,我做了一些转换和处理检查点可以帮助我以及我在我的情况下如何使用

scala apache-spark bigdata cloudera hortonworks-sandbox
1个回答
1
投票

*****************解决方案*******************

val spark = SparkSession .builder .appName(“test”)。config(“spark.local”,“local [*]”)。getOrCreate()spark.sparkContext.setCheckpointDir(path_checkpoint)并在我调用checkpoint函数之后dataframe我指定了一个执行作业的触发器

   .writeStream
    .format("csv") 
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec") 
    .option("checkpointLocation",CheckPoint)   
 .trigger(Trigger.ProcessingTime("180 seconds")) 
    .option("Path",Path )  
    .option("header", true)  
    .outputMode("Append")
    .queryName("test")
    .start()
© www.soinside.com 2019 - 2024. All rights reserved.