我有一个 600Mb 的 .CSV 文件,包含 650 万行和 10 列。这些列主要是 id 和 sum,但一列“type”只有 2 个唯一值:“online”和“offline”。 当我读取文件并将其保存为镶木地板(未经任何处理)时,它大约有 60Mb,但是当我读取文件,按“类型”和一些 ID 排序,然后将其保存为镶木地板时,文件大小约为 300Mb。
从逻辑上思考,它应该更小,因为很容易压缩排序列“类型”——比如“前 3,25M 离线,接下来 3,25M 在线”。所以结果真的很令人惊讶。
环境: Windows 10 Spark 和 Pyspark 版本:3.4.1 Hadoop版本:3.0.0 Java:v8,1.8.0_391-b13
那么,为什么排序会对文件大小产生负面影响,以及如何解决这个问题?
有一些事情可以尝试: 在 Spark 中保存镶木地板文件时,默认情况下使用
snappy
压缩方法。这有一定的优点,但也有一定的缺点(见下表)。这不是最节省空间的有效方法。
df.write.parquet('file_snappy.parquet') #equivalent to the below
df.write.option('compression', 'snappy').parquet('file_snappy.parquet')
但是还有其他压缩方法您可以尝试: 我建议您尝试以下所有方法,并检查哪一个较小(注意:有些像 brotli 需要安装编解码器。这似乎不那么简单)。然而,从here获取一个jar文件并将其包含在conf.SparkJars中可能会有所帮助。
df.write.option('compression', 'gzip').parquet('file_gzip.parquet')
df.write.option('compression', 'brotli').parquet('file_brotli.parquet')
df.write.option('compression', 'lz4').parquet('file_lz4.parquet')
df.write.option('compression', 'zstd').parquet('file_zstd.parquet')
一旦您根据
文档确定了最适合您的压缩方法,就可以使用
spark.sql.parquet.compression.codec
全局设置压缩方法
作为指南,此表可能会有所帮助。从看来,如果您设法安装它/让它工作,brotli将对您的用例有用!
除了编解码器之外,您还可以尝试以可能有益的方式进行排序。 Spark 中有一些关于高效且有用的排序的文档。我建议查看
repartitionAndSortWithinPartitions
、 sortWithinPartitions
以及相应的 documentation 中的其他内容。下面是按类型和 id 重新分区后使用 sortWithinPartitions
的示例。也许这会有所帮助(或没有帮助)。通过尝试不同的方法,您将找到适合您的特定数据的最佳解决方案。
repartitioned_sorted_df = df.repartitionByRange('type', 'id') \
.sortWithinPartitions(['type', 'id'])