当我将Sparksql查询产生的DataFrame保存到HDFS中时,它将生成大量零件文件,每个零件文件的大小为1.4 KB。有没有一种方法可以增加文件的大小,因为每个零件文件都包含大约2条记录。
df_crimes_dates_formated = spark.sql('SELECT CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) AS DATES , Primary_Type , COUNT(1) AS COUNT FROM crimes_data Group By CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) , Primary_Type ORDER BY CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) , COUNT(1) DESC' )
df_crimes_dates_formated.write.save('hdfs:///user/maria_dev/crimes/monthly_crimes/')
您可以根据用例使用.repartition()
(or) .coalesce()
来控制HDFS中的文件数。
.repartition()
.coalesce()
您可以得出每个分区将具有的行数,因此将提供所需的文件大小,然后将其除以数据帧计数以动态确定分区数。
#to get number of partitions of dataframe, spark creates part files depends on number of partitions in dataframe
>>> df_crimes_dates_formated.rdd.getNumPartitions()
#create 10 part files in HDFS
>>> df_crimes_dates_formated.repartition(10).write.save('hdfs:///user/maria_dev/crimes/monthly_crimes/')
另外:
从Spark-2.2开始,如果我们在数据帧中有1个分区,并且控制要写入文件的行数,请使用Caluculating number of partitons dynamically:
选项。
df.count()
#3
#req rows for each partition
rows=1
par=df.count()/rows
partitions=int('1' if par <= 0 else par)
#repartition with partitions value
df.repartition(partitions).rdd.getNumPartitions()
#3