df_crimes_dates_formated = spark.sql('SELECT CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) AS DATES , Primary_Type , COUNT(1) AS COUNT FROM crimes_data Group By CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) , Primary_Type ORDER BY CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) , COUNT(1) DESC' )
df_crimes_dates_formated.write.save('hdfs:///user/maria_dev/crimes/monthly_crimes/')
.repartition()
(or) .coalesce()
来控制HDFS中的文件数。.repartition()
您可以得出每个分区将具有的行数,因此将提供所需的文件大小,然后将其除以数据帧计数以动态确定分区数。
.coalesce()
#to get number of partitions of dataframe, spark creates part files depends on number of partitions in dataframe >>> df_crimes_dates_formated.rdd.getNumPartitions() #create 10 part files in HDFS >>> df_crimes_dates_formated.repartition(10).write.save('hdfs:///user/maria_dev/crimes/monthly_crimes/')
另外:从Spark-2.2开始,如果我们在数据帧中有1个分区,并且控制要写入文件的行数,请使用
Caluculating number of partitons dynamically:
选项。df.count() #3 #req rows for each partition rows=1 par=df.count()/rows partitions=int('1' if par <= 0 else par) #repartition with partitions value df.repartition(partitions).rdd.getNumPartitions() #3