在Pyspark Databricks中处理1000个JSON文件

问题描述 投票:0回答:1

我大约有2.5 k个JSON文件,每个JSON文件代表1行。使用这些文件,我需要执行一些非常简单的ETL并将它们移到我的datalake的curated部分中。

我遍历数据湖并通过一个简单的.read调用来调用JSON文件,我先定义了JSON模式。

然后执行ETL并尝试将这些文件写入数据湖的单独部分,但是写入部分极端慢,只花了15分钟就写了一个只有几百kb的文件?

rp  = spark.read.json(paths, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())

for iter in iterable:
    #do stuff
    # filter my sparkDF with .filter
    SparkDF_F = sparkDF.filter(...)
    sparkDF_F.write('path/filename.parquet')

我尝试使用'OPTIMIZE'并在我的路径上将其调用

%sql
OPTIMIZE delta.'dbfs:/mnt/raw/data/table'

将引发以下错误。

Error in SQL statement: ParseException: 
mismatched input 'dbfs:/mnt/raw/data/table' expecting {'SELECT', 'FROM', '
ADD', 'AS', 'TIMESTAMP', 'VERSION', 'ALL', 'ANY', 'DISTINCT', 
'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER.... 

有人能够引导我了解我在这里的误解吗?

设置

  • Azure Databricks
  • 6.0
  • 火花2.4
  • Python 3.6
  • 具有12个核心的42GB群集。
  • 4个节点
  • Azure Gen1 DataLake。
python azure pyspark azure-databricks
1个回答
0
投票

两件事:

  1. 如果2.5k JSON文件存储在同一文件夹中。您可以使用相同的文件夹路径直接读取它们:

    rp = spark.read.json(path_common,multiLine = True,schema = json_s).withColumn('path',F.input_file_name())>] >>

    然后,您可以将rp.filter应用于整个数据帧,因为它只有一个(不需要对每个文件进行迭代)

    1. 关于Delta的文档,您只能优化表(存储在dbfs中),而不能直接优化。因此,您可以使用dbfs中指向的目录创建表,并按照文档中的建议使用优化:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

    希望这会有所帮助

© www.soinside.com 2019 - 2024. All rights reserved.