PySpark:将输入文件写入单独的输出文件而不进行重新分区

问题描述 投票:0回答:1

我有一系列非常大的每日gzip压缩文件。我正在尝试使用PySpark以Parquet格式重新保存S3中的所有文件供以后使用。

如果对于单个文件(例如,2012-06-01),我会:

dataframe = spark.read.csv('s3://mybucket/input/20120601.gz', schema=my_schema, header=True)
dataframe.write.parquet('s3://mybucket/output/20120601')

它可以工作,但由于gzip不可拆分,它可以在一台主机上运行,​​而且我没有使用集群的好处。

我尝试一次读取一大块文件,并使用partitionBy将输出写入这样的日常文件(例如,在一个月内阅读):

dataframe = spark.read.csv('s3://mybucket/input/201206*.gz', schema=my_schema, header=True)
dataframe.write.partitionBy('dayColumn').parquet('s3://mybucket/output/')

这次,单个文件在我想要的不同执行程序中读取,但执行程序稍后会死,并且进程将失败。我相信因为文件太大了,而且partitionBy以某种方式使用了不必要的资源(一个shuffle?),它会使任务崩溃。

我实际上并不需要重新划分我的数据帧,因为这只是一个1:1的映射。无论如何,是否要将每个单独的任务写入一个单独的,明确命名的镶木地板输出文件?

我在想类似的东西

def write_file(date):
    # get input/output locations from date
    dataframe = spark.read.csv(input_location, schema=my_schema, header=True)
    dataframe.write.parquet(output_location)
spark.sparkContext.parallelize(my_dates).for_each(write_file)

除非这不起作用,因为您无法将Spark会话广播到群集。有什么建议?

apache-spark pyspark parquet
1个回答
1
投票

将输入文件写入单独的输出文件而不重新分区

TL; DR这是您的代码已经在做的事情。

partitionBy导致不必要的shuffle

没有.DataFrameWriter.partitionBy根本没有洗牌。

它工作,但因为gzip不可拆分

您可以:

  • 完全降低压缩--Plaquet使用内部压缩。
  • bzip2一样使用可分割压缩。
  • 在提交作业之前将文件解压缩到临时存储。

如果您担心partitionBy使用的资源(它可能会为每个执行程序线程打开更多文件),您实际上可以随机改进以提高性能 - DataFrame partitionBy to a single Parquet file (per partition)。单个文件可能很多但是

dataframe \
    .repartition(n, 'dayColumn', 'someOtherColumn') \
    .write.partitionBy('dayColumn') \
    .save(...)

在哪里可以选择someOtherColumn来获得合理的基数,应该改进的东西。

© www.soinside.com 2019 - 2024. All rights reserved.