Azure databricks spark - 写入blob存储

问题描述 投票:1回答:2

我有一个包含两列的数据框--filepath(blobs的wasbs文件路径),字符串,并希望将每个字符串写入具有该文件名的单独blob。我怎样才能做到这一点?

apache-spark pyspark databricks azure-blob-storage
2个回答
0
投票

你一次只能写一个wasb容器 - 不确定这是否是你问题的一部分,但我想澄清一种方式。此外,spark将文件写入目录,而不是单个文件。如果你想要完全满足你的要求,你将不得不重新分配到1个分区并按文件路径分区。

在该步骤之后,您将需要使用azure sdk重命名文件并将它们移动到父目录。


0
投票

更新的答案:

我找到了一种使用dbutils.fs.put实现这一目标的简单方法。您需要循环遍历DataFrame的每一行,为每一行调用dbutils.fs.put()。

假设您的输入文件(假设CSV)有两列,如下所示:

filepath, stringValue
wasbs://[email protected]/demo1.txt,"demo string 1"
wasbs://[email protected]/demo2.txt,"demo string 2"
wasbs://[email protected]/demo3.txt,"demo string 3"
wasbs://[email protected]/demo4.txt,"demo string 4"
wasbs://[email protected]/demo5.txt,"demo string 5"

您可以使用以下内容遍历输入DataFrame中的每一行:

df = spark.read.option("header", True).csv("wasbs://[email protected]/demo-data.csv")

rowList = df.rdd.collect()
for row in rowList:
  dbutils.fs.put(str(row[0]), str(row[1]), True)

put方法将给定的String写入以UTF-8编码的文件,因此使用它可以循环遍历DataFrame中的每个记录,将第一列作为文件路径传递,第二列作为要写入的字符串内容到文件。

这也有将字符串写入单个文件的好处,因此您无需经历重命名和移动文件的过程。

老答案:

由于Spark的分布式特性,将DataFrame写入文件会导致创建一个包含多个文件的目录。您可以使用coalesce强制处理单个工作人员和文件,其名称将以part-0000开头。

免责声明:建议仅适用于小文件,因为较大的数据文件可能导致内存不足异常。

要完成您正在尝试的操作,您需要循环遍历DataFrame的每一行,为每行创建一个新的DataFrame,其中只包含要写入文件的字符串值。

假设您的输入文件(假设CSV)有两列,如下所示:

filepath, stringValue
wasbs://[email protected]/demo1,"demo string 1"
wasbs://[email protected]/demo2,"demo string 2"
wasbs://[email protected]/demo3,"demo string 3"
wasbs://[email protected]/demo4,"demo string 4"
wasbs://[email protected]/demo5,"demo string 5"

您可以使用以下内容遍历输入DataFrame中的每一行:

from pyspark.sql import *
from pyspark.sql.types import StringType

df = spark.read.option("header", True).csv("wasbs://[email protected]/demo-data.csv")

rowList = df.rdd.collect()
for row in rowList:
  dfRow = spark.createDataFrame([str(row[1])], StringType())
  dfRow.coalesce(1).write.mode("overwrite").text(row[0])

这将导致在名为demo1,demo2,demo3,demo4和demo5的Blob Storage帐户容器中创建目录。其中每个都包含多个文件。名称以part-0000开头的每个目录中的文件是包含字符串值的文件。

如果您需要这些文件具有不同的名称,并且位于不同的位置,则可以使用dbutils.fs方法来处理移动文件和执行重命名。如果需要,您还可以使用它来清除所创建的目录。

© www.soinside.com 2019 - 2024. All rights reserved.