我大约有2.5 k个JSON文件,每个JSON文件代表1行。使用这些文件,我需要执行一些非常简单的ETL并将它们移到我的datalake的curated
部分中。
我遍历数据湖并通过一个简单的.read
调用来调用JSON文件,我先定义了JSON模式。
然后执行ETL并尝试将这些文件写入数据湖的单独部分,但是写入部分极端慢,只花了15分钟就写了一个只有几百kb的文件?
rp = spark.read.json(paths, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())
for iter in iterable:
#do stuff
# filter my sparkDF with .filter
SparkDF_F = sparkDF.filter(...)
sparkDF_F.write('path/filename.parquet')
我尝试使用'OPTIMIZE'并在我的路径上将其调用
%sql
OPTIMIZE delta.'dbfs:/mnt/raw/data/table'
将引发以下错误。
Error in SQL statement: ParseException:
mismatched input 'dbfs:/mnt/raw/data/table' expecting {'SELECT', 'FROM', '
ADD', 'AS', 'TIMESTAMP', 'VERSION', 'ALL', 'ANY', 'DISTINCT',
'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER....
有人能够引导我了解我在这里的误解吗?
设置
两件事:
如果2.5k JSON文件存储在同一文件夹中。您可以使用相同的文件夹路径直接读取它们:
rp = spark.read.json(path_common,multiLine = True,schema = json_s).withColumn('path',F.input_file_name())>] >>
然后,您可以将rp.filter应用于整个数据帧,因为它只有一个(不需要对每个文件进行迭代)
希望这会有所帮助