Spark - 处理后移动文件

问题描述 投票:0回答:1

我将 json 文件读入数据帧:

df = spark.read.option("multiline", "true").json(f"/mnt/bronze/{something}*")

然后我进行一些处理,然后将其写入增量表:

rows.write.format("delta").mode("overwrite").save(f"/mnt/silver/{something}")

我想在处理后移动 json 文件,这样我以后就不会再次重新处理它们。但我不知道如何移动它们。

我尝试在 rdd.foreach() 中使用 dbutils,但这是不可能的。我还在 rdd.foreach() 中尝试了shutil.move(),但由于我使用了blob存储,所以找不到我的文件。

apache-spark pyspark azure-blob-storage databricks delta-lake
1个回答
0
投票

让我提供一些解决方案。

我想 /mnt/bronze/{something} 是您着陆文件的暂存区域,并且您希望防止读取以前着陆的文件。

您当前笔记本的末尾有一些选项:

  1. 如果不再需要原始文件,请将其删除:
dbutils.fs.rm("/mnt/bronze/{something}*", True)

dbutils.fs.rm
是删除文件的函数,
"/mnt/bronze/{something}*"
是要删除的文件的路径和模式,第二个参数 True 表示删除是递归的(如果您确定删除,则可以选择)仅删除文件)。

  1. 如果您将来可能需要这些文件,只需将它们移动到临时目录,以便将来可以安全地删除它们
dbutils.fs.mv("/mnt/bronze/{something}*", "/mnt/tmp/")
© www.soinside.com 2019 - 2024. All rights reserved.