我有一系列非常大的每日gzip压缩文件。我正在尝试使用PySpark以Parquet格式重新保存S3中的所有文件供以后使用。
如果对于单个文件(例如,2012-06-01),我会:
dataframe = spark.read.csv('s3://mybucket/input/20120601.gz', schema=my_schema, header=True)
dataframe.write.parquet('s3://mybucket/output/20120601')
它可以工作,但由于gzip不可拆分,它可以在一台主机上运行,而且我没有使用集群的好处。
我尝试一次读取一大块文件,并使用partitionBy将输出写入这样的日常文件(例如,在一个月内阅读):
dataframe = spark.read.csv('s3://mybucket/input/201206*.gz', schema=my_schema, header=True)
dataframe.write.partitionBy('dayColumn').parquet('s3://mybucket/output/')
这次,单个文件在我想要的不同执行程序中读取,但执行程序稍后会死,并且进程将失败。我相信因为文件太大了,而且partitionBy以某种方式使用了不必要的资源(一个shuffle?),它会使任务崩溃。
我实际上并不需要重新划分我的数据帧,因为这只是一个1:1的映射。无论如何,是否要将每个单独的任务写入一个单独的,明确命名的镶木地板输出文件?
我在想类似的东西
def write_file(date):
# get input/output locations from date
dataframe = spark.read.csv(input_location, schema=my_schema, header=True)
dataframe.write.parquet(output_location)
spark.sparkContext.parallelize(my_dates).for_each(write_file)
除非这不起作用,因为您无法将Spark会话广播到群集。有什么建议?
将输入文件写入单独的输出文件而不重新分区
TL; DR这是您的代码已经在做的事情。
partitionBy导致不必要的shuffle
没有.DataFrameWriter.partitionBy
根本没有洗牌。
它工作,但因为gzip不可拆分
您可以:
bzip2
一样使用可分割压缩。如果您担心partitionBy
使用的资源(它可能会为每个执行程序线程打开更多文件),您实际上可以随机改进以提高性能 - DataFrame partitionBy to a single Parquet file (per partition)。单个文件可能很多但是
dataframe \
.repartition(n, 'dayColumn', 'someOtherColumn') \
.write.partitionBy('dayColumn') \
.save(...)
在哪里可以选择someOtherColumn
来获得合理的基数,应该改进的东西。