我将 json 文件读入数据帧:
df = spark.read.option("multiline", "true").json(f"/mnt/bronze/{something}*")
然后我进行一些处理,然后将其写入增量表:
rows.write.format("delta").mode("overwrite").save(f"/mnt/silver/{something}")
我想在处理后移动 json 文件,这样我以后就不会再次重新处理它们。但我不知道如何移动它们。
我尝试在 rdd.foreach() 中使用 dbutils,但这是不可能的。我还在 rdd.foreach() 中尝试了shutil.move(),但由于我使用了blob存储,所以找不到我的文件。
让我提供一些解决方案。
我想 /mnt/bronze/{something} 是您着陆文件的暂存区域,并且您希望防止读取以前着陆的文件。
您当前笔记本的末尾有一些选项:
dbutils.fs.rm("/mnt/bronze/{something}*", True)
dbutils.fs.rm
是删除文件的函数,"/mnt/bronze/{something}*"
是要删除的文件的路径和模式,第二个参数 True 表示删除是递归的(如果您确定删除,则可以选择)仅删除文件)。
dbutils.fs.mv("/mnt/bronze/{something}*", "/mnt/tmp/")