Spark:内存繁重的连接操作的最佳实践

问题描述 投票:0回答:1

我有一个火花程序,涉及大型Hive表的连接操作(数百万行,数百列)。在这些连接期间使用的内存非常高。我想了解在YARN上的Spark中处理这种情况的最佳方法,即作业将成功完成而不会出现内存错误。该集群由7名工人组成,每个工作人员有110 GB的RAM和16个核心。请考虑以下scala代码:

object Model1Prep {

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setAppName("Modello1_Spark")
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        conf.set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec")
        val sc = new SparkContext(conf)
        val hc = new HiveContext(sc)
        import hc.implicits._


        hc.sql("SET hive.exec.compress.output=true")
        hc.sql("SET parquet.compression=SNAPPY")
        hc.sql("SET spark.sql.parquet.compression.codec=snappy")


        // loading tables on dataframes
        var tableA = hc.read.table("TA")
        var tableB = hc.read.table("TB")
        var tableC = hc.read.table("TC")
        var tableD = hc.read.table("TD")


        // registering tables
        tableA.registerTempTable("TA")
        tableB.registerTempTable("TB")
        tableC.registerTempTable("TC")
        tableD.registerTempTable("TD")


        var join1 = hc.sql("""
            SELECT 
                [many fields]
            FROM TA a 
            JOIN TB b ON a.field = b.field      
            LEFT JOIN TC c ON a.field = c.field         
            WHERE [conditions]
        """)


        var join2 = hc.sql("""
            SELECT 
                [many fields]
            FROM TA a 
            LEFT JOIN TD d ON a.field = d.field
            WHERE [conditions]
        """)


        // [other operations]


        sc.close()
    }
}

考虑到连接操作在内存上非常昂贵,我最好的选择是什么?我知道数据帧可以在内存和磁盘上保留,可能使用序列化在内存中更紧凑,代价是反序列化的处理时间更慢(更多关于herehere)。从上面的代码中,表TA在两个连接中都使用,因此保留它是有意义的:

    //[...]        

    // persisting
    tableA.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

    // registering tables
    tableA.registerTempTable("TA")
    tableB.registerTempTable("TB")
    tableC.registerTempTable("TC")
    tableD.registerTempTable("TD")

    //[...]

我是否也应该以同样的方式坚持其他表格?或者还有其他事情可以使这些代码顺利运行并完成吗?

scala apache-spark hadoop pyspark yarn
1个回答
0
投票

如果你知道你正在加入哪个字段并且它始终是相同的字段,那么,as this SO answer suggests,对连接的表使用相同的分区器。

© www.soinside.com 2019 - 2024. All rights reserved.