我正在尝试找到最有效的方法来遍历数据湖中的数千个文件,并合并所有与特定模式匹配的文件。我有成千上万个具有12种不同模式的文件。我正在尝试将具有相同架构的文件合并到一个数据框中(因此,我将有12个数据框)。现在,我想将文件名附加到每个模式的字段中。这是一个例子。
我在Google周围搜寻解决方案,似乎应该是:
import glob
dfABC = sc.union([proc(f) for f in glob.glob("rawdata/2019/*/ABC_Folder/ABC*.gz")])
# quick sanity check
print(dfABC.count(),len(dfABC.columns))
或者也许是这样。
dfCW = sc.textFile('rawdata/2019/*/ABC_Folder/ABC*'.join(files))
shape(dfCW)
我正在使用Azure Databricks和PySpark。
注意:在2019年之后,我有几个月又几天,所以类似:/ 2019/01/01 /
[我希望大部分时间循环浏览所有文件,但也可以选择逐月浏览,因为某些文件的内容会在年中更改。
我终于,终于,终于成功了。
val myDFCsv = spark.read.format("csv")
.option("sep","|")
.option("inferSchema","true")
.option("header","false")
.load("mnt/rawdata/2019/01/01/client/ABC*.gz")
myDFCsv.show()
myDFCsv.count()