在S3中将每个分区数据写入单个文件中

问题描述 投票:0回答:2

我们有一个用例,我们希望用列值对数据帧进行分区,然后将每个分区写入单个文件。我做了同样的事情:

val df = spark.read.format("csv").load("hdfs:///tmp/PartitionKeyedDataset.csv")

df.repartition($"_c1")

df.rdd.saveAsTextFile("s3://dfdf/test1234")

当我做:

df.rdd.partitions.size 

我只获得了62个分区。但是,该列的不同值是10,214(通过运行df.select(“_ c1”)得到它.expare.count)

我不能用:

df.write.partitionBy("_c1").save("s3://dfdf/test123")

因为这会在目标中创建具有分区名称的文件夹。我们不希望这样。我们只想要转储文件。

scala apache-spark
2个回答
0
投票

我犯了一个不使用新变量的愚蠢错误。因此,我看到相同数量的分区。以下是更新的代码:

val df = spark.read.format("csv").load("hdfs:///tmp/PartitionKeyedDataset.csv")

df.repartition($"_c1")

df.rdd.saveAsTextFile("s3://dfdf/test1234")

默认情况下,重新分区只会创建200个分区,因为spark.sql.shuffle.partitions的默认值为200.我已将此值设置为我想要分区的列的唯一值数。

spark.conf.set("spark.sql.shuffle.partitions", "10214")

在此之后,我获得了10214个分区,并且写入操作在S3中创建了10214个文件。


0
投票

您需要将新数据帧分配给变量并使用它。目前在您的代码中,repartition部分实际上并没有做任何事情。

val df = spark.read.format("csv").load("hdfs:///tmp/PartitionKeyedDataset.csv")
val df2 = df.repartition($"_c1")
df2.rdd.saveAsTextFile("s3://dfdf/test1234")

尽管可以更改spark.sql.shuffle.partitions设置,但这并不灵活。

© www.soinside.com 2019 - 2024. All rights reserved.