我正在尝试在相当大的数据集上使用Spark的bucketBy功能。
dataframe.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
问题是我的Spark集群有大约500个分区/任务/执行程序(不确定术语),所以最终得到的文件如下:
part-00001-{UUID}_00001.c000.snappy.parquet
part-00001-{UUID}_00002.c000.snappy.parquet
...
part-00001-{UUID}_00500.c000.snappy.parquet
part-00002-{UUID}_00001.c000.snappy.parquet
part-00002-{UUID}_00002.c000.snappy.parquet
...
part-00002-{UUID}_00500.c000.snappy.parquet
part-00500-{UUID}_00001.c000.snappy.parquet
part-00500-{UUID}_00002.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
即500x500 = 250000个拼花实木复合地板文件! FileOutputCommitter
永远需要将其提交给S3。
像Hive中一样,有没有一种生成每个存储桶一个文件的方法?还是有更好的方法来解决这个问题?到目前为止,似乎我必须在降低群集的并行性(减少作者的数量)或降低木地板文件的并行性(减少存储桶的数量)之间进行选择。
谢谢
我正在尝试在相当大的数据集上使用Spark的bucketBy功能。 dataframe.write().format(“ parquet”).bucketBy(500,bucketColumn1,bucketColumn2).mode(SaveMode.Overwrite)....
这应该解决。