我尝试使用数据块中的 scala 合并 Datalake 中的两个文件,并使用以下代码将其保存回 Datalake:
val df =sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("adl://xxxxxxxx/Test/CSV")
df.coalesce(1).write.
format("com.databricks.spark.csv").
mode("overwrite").
option("header", "true").
save("adl://xxxxxxxx/Test/CSV/final_data.csv")
然而,文件 Final_data.csv 被保存为一个目录,而不是一个包含多个文件的文件,并且实际的 .csv 文件被保存为“part-00000-tid-dddddddddd-xxxxxxxxxx.csv”。
如何重命名该文件以便将其移动到另一个目录?
明白了。可以使用以下代码将其重命名并放置到另一个目标中。当前合并的文件也将被删除。
val x = "Source"
val y = "Destination"
val df = sqlContext.read.format("csv")
.option("header", "true").option("inferSchema", "true")
.load(x+"/")
df.repartition(1).write.
format("csv").
mode("overwrite").
option("header", "true").
save(y+"/"+"final_data.csv")
dbutils.fs.ls(x).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(y+"/"+"final_data.csv").filter(file=>file.name.startsWith("part-00000"))(0).path,y+"/"+"data.csv")
dbutils.fs.rm(y+"/"+"final_data.csv",true)
重命名Azure DataBricks中ADLS Gen2中存储的文件:我们可以使用重命名或复制方法来执行此操作。如果文件以part-0000开头或以.csv结尾,那么我们可以使用逻辑。重命名:data.csv
从 pyspark.sq1.functions 导入 col
source_path =“abfss://so[电子邮件受保护]/sample/final_data/”
new_name="abfss://source[电子邮件受保护]/sample/output/data.csv"
getname = dbutils.fs.ls(源路径)
df_filelist = Spark.createDataFrame(getname)
filename = df_filelist.filter(col("name").like("%.csv%")).select("name").collect)[0][0]
旧名称=源路径+文件名 dbutils.fs.mv(旧名称,新名称)
dbutils.fs.rm(source_path+'/',True)
Python
y = "dbfs:/mnt/myFirstMountPoint/apltperf/Shiv/Destination"
df = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load(x+"/")
df.repartition(1).write.format("csv").mode("overwrite").save(y+"/"+"final_data.csv")
spark.conf.set('x', str(x)) spark.conf.set('y', str(y))
斯卡拉
var x=spark.conf.get("x")
var y=spark.conf.get("y") dbutils.fs.ls(x).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(y+"/"+"final_data.csv").filter(file=>file.name.startsWith("part-00000"))(0).path,y+"/"+"data.csv")
dbutils.fs.rm(y+"/"+"final_data.csv",true)