我有一个包含两列的数据框--filepath(blobs的wasbs文件路径),字符串,并希望将每个字符串写入具有该文件名的单独blob。我怎样才能做到这一点?
你一次只能写一个wasb容器 - 不确定这是否是你问题的一部分,但我想澄清一种方式。此外,spark将文件写入目录,而不是单个文件。如果你想要完全满足你的要求,你将不得不重新分配到1个分区并按文件路径分区。
在该步骤之后,您将需要使用azure sdk重命名文件并将它们移动到父目录。
更新的答案:
我找到了一种使用dbutils.fs.put
实现这一目标的简单方法。您需要循环遍历DataFrame的每一行,为每一行调用dbutils.fs.put()。
假设您的输入文件(假设CSV)有两列,如下所示:
filepath, stringValue
wasbs://[email protected]/demo1.txt,"demo string 1"
wasbs://[email protected]/demo2.txt,"demo string 2"
wasbs://[email protected]/demo3.txt,"demo string 3"
wasbs://[email protected]/demo4.txt,"demo string 4"
wasbs://[email protected]/demo5.txt,"demo string 5"
您可以使用以下内容遍历输入DataFrame中的每一行:
df = spark.read.option("header", True).csv("wasbs://[email protected]/demo-data.csv")
rowList = df.rdd.collect()
for row in rowList:
dbutils.fs.put(str(row[0]), str(row[1]), True)
put方法将给定的String写入以UTF-8编码的文件,因此使用它可以循环遍历DataFrame中的每个记录,将第一列作为文件路径传递,第二列作为要写入的字符串内容到文件。
这也有将字符串写入单个文件的好处,因此您无需经历重命名和移动文件的过程。
老答案:
由于Spark的分布式特性,将DataFrame写入文件会导致创建一个包含多个文件的目录。您可以使用coalesce
强制处理单个工作人员和文件,其名称将以part-0000
开头。
免责声明:建议仅适用于小文件,因为较大的数据文件可能导致内存不足异常。
要完成您正在尝试的操作,您需要循环遍历DataFrame的每一行,为每行创建一个新的DataFrame,其中只包含要写入文件的字符串值。
假设您的输入文件(假设CSV)有两列,如下所示:
filepath, stringValue
wasbs://[email protected]/demo1,"demo string 1"
wasbs://[email protected]/demo2,"demo string 2"
wasbs://[email protected]/demo3,"demo string 3"
wasbs://[email protected]/demo4,"demo string 4"
wasbs://[email protected]/demo5,"demo string 5"
您可以使用以下内容遍历输入DataFrame中的每一行:
from pyspark.sql import *
from pyspark.sql.types import StringType
df = spark.read.option("header", True).csv("wasbs://[email protected]/demo-data.csv")
rowList = df.rdd.collect()
for row in rowList:
dfRow = spark.createDataFrame([str(row[1])], StringType())
dfRow.coalesce(1).write.mode("overwrite").text(row[0])
这将导致在名为demo1,demo2,demo3,demo4和demo5的Blob Storage帐户容器中创建目录。其中每个都包含多个文件。名称以part-0000
开头的每个目录中的文件是包含字符串值的文件。
如果您需要这些文件具有不同的名称,并且位于不同的位置,则可以使用dbutils.fs
方法来处理移动文件和执行重命名。如果需要,您还可以使用它来清除所创建的目录。