减少Spark作业从s3读取时的分区数

问题描述 投票:0回答:1

我有一个AWS Glue ETL作业,该作业从s3中读取实木复合地板,然后再次以实木复合地板格式将它们写入s3。

    val res = glueContext
      .getSourceWithFormat(
        connectionType = "s3",
        options = JsonOptions(
          Map(
            "path" -> sourcePath,
            "recurse" -> true,
            "groupFiles" -> "inPartition",
            "groupSize" -> 104857600,
            "useS3ListImplementation" -> true
          )),
        format = "parquet",
        transformationContext = "sourceDF"
      )
      .getDynamicFrame()

    println("Number of partitions " + res.getNumPartitions) //1780 partitions are created

    res
      .write
      .mode(SaveMode.Overwrite)
      .partitionBy("orderId", "dt")
      .save(sinkPath)

在s3中,大约有5万个文件,大小约为670kb。

当我运行此作业时,我得到的分区数是1780。在输出中,我看到创建了1000多个文件。

所有这些输出的文件都在2mb左右。但是问题是,如果我们有很多小文件,那么从Athena那里将效率低下。

我可以在内存中进行分区,但是我发现很多数据都被重新整理了,并且看到这项工作的性能非常差。甚至合并效果也不佳。

所以我的问题是:我该如何减少它们。理想情况下,我想有10个分区。

谢谢

apache-spark amazon-s3 partitioning parquet aws-glue
1个回答
0
投票

分区的输入数据数:documentation解释了一些通过对输入文件进行分组来读取多个小文件的选项。如果您未使用分组配置的任何配置,则可能无法计算分区数。

输出文件的分区数:如果动态框架是从单个分区动态框架创建的,则它应具有相同数量的分区。否则,如果由于混洗(例如连接)而从多个DynamicFrames创建了数据,那么我建议在写入S3之前对数据进行重新分区。要设置一定数量的分区,则应运行

dynamicframe.repartition(#partitions)
© www.soinside.com 2019 - 2024. All rights reserved.