如何避免在Scala的Spark RDD中使用collect?

问题描述 投票:0回答:1

我有一个列表,必须从中创建Map以便进一步使用,我正在使用RDD,但是使用collect()时,群集中的作业失败。任何帮助表示赞赏。

请帮助。下面是从List到rdd.collect的示例代码。我必须进一步使用此地图数据,但是如何在不收集的情况下使用?

此代码根据RDD(列表)数据创建地图。列表格式->((asdfg / 1234 / wert,asdf)

 //List Data to create Map
 val listData = methodToGetListData(ListData).toList
//Creating RDD from above List  

  val rdd = sparkContext.makeRDD(listData)

      implicit val formats = Serialization.formats(NoTypeHints)
      val res = rdd
        .map(map => (getRPath(map._1), getAttribute(map._1), map._2))
        .groupBy(_._1)
        .map(tuple => {
          Map(
            "P_Id" -> "1234",
            "R_Time" -> "27-04-2020",
            "S_Time" -> "27-04-2020",
            "r_path" -> tuple._1,
            "S_Tag" -> "12345,
            tuple._1 -> (tuple._2.map(a => (a._2, a._3)).toMap)
          )
        })

      res.collect()
    }
scala apache-spark rdd persist collect
1个回答
0
投票

问:如何在不收集的情况下使用?


答案:collect将命中。它将把数据移动到驱动程序节点。如果数据是巨大。永远不要那样做。


我不完全知道准备map的用例是什么,但可以实现<< [使用内置的Spark API内置] >>即collectionAccumulator ...,详细说,]] >

[collectionAccumulator[scala.collection.mutable.Map[String, String]]

] >>
让我们假设,这是您的示例数据框,您想制作地图。

+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+ |Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree | +-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+ |-0909 |1234 |Cables-1 |23-12-2020 |LC |Installed |ABCD1234 |0 |Cables |ASDF123 |12345 |Start~>HInfo->Cables->Cables-1 | |-09091 |1234111 |Cables-11 |23-12-2022 |LC1 |Installed1 |ABCD12341 |0 |Cables1 |ASDF1231 |123451 |Start~>HInfo->Cables->Cables-11| +-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+

由此您要制作地图(

嵌套的地图,在您的示例中以nestedmap键名为前缀

然后...下面是完整的示例,请看一下并进行相应的修改。

package examples import org.apache.log4j.Level object GrabMapbetweenClosure extends App { val logger = org.apache.log4j.Logger.getLogger("org") logger.setLevel(Level.WARN) import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .master("local[*]") .appName(this.getClass.getName) .getOrCreate() import spark.implicits._ var mutableMapAcc = spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String, String]]("mutableMap") val df = Seq( ("-0909", "1234", "Cables-1", "23-12-2020", "LC", "Installed", "ABCD1234" , "0", "Cables", "ASDF123", "12345", "Start~>HInfo->Cables->Cables-1") , ("-09091", "1234111", "Cables-11", "23-12-2022", "LC1", "Installed1", "ABCD12341" , "0", "Cables1", "ASDF1231", "123451", "Start~>HInfo->Cables->Cables-11") ).toDF("Item_Id", "Parent_Id", "object_class_instance", "Received_Time", "CablesName", "CablesStatus", "CablesHInfoID", "CablesIndex", "object_class", "ServiceTag", "Scan_Time", "relation_tree" ) df.show(false) df.foreachPartition { partition => // for performance sake I used foreachPartition partition.foreach { record => { mutableMapAcc.add(scala.collection.mutable.Map( "Item_Id" -> record.getAs[String]("Item_Id") , "CablesStatus" -> record.getAs[String]("CablesStatus") , "CablesHInfoID" -> record.getAs[String]("CablesHInfoID") , "Parent_Id" -> record.getAs[String]("Parent_Id") , "CablesIndex" -> record.getAs[String]("CablesIndex") , "object_class_instance" -> record.getAs[String]("object_class_instance") , "Received_Time" -> record.getAs[String]("Received_Time") , "object_class" -> record.getAs[String]("object_class") , "CablesName" -> record.getAs[String]("CablesName") , "ServiceTag" -> record.getAs[String]("ServiceTag") , "Scan_Time" -> record.getAs[String]("Scan_Time") , "relation_tree" -> record.getAs[String]("relation_tree") ) ) } } } println("FinalMap : " + mutableMapAcc.value.toString) }

结果:

+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+ |Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree | +-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+ |-0909 |1234 |Cables-1 |23-12-2020 |LC |Installed |ABCD1234 |0 |Cables |ASDF123 |12345 |Start~>HInfo->Cables->Cables-1 | |-09091 |1234111 |Cables-11 |23-12-2022 |LC1 |Installed1 |ABCD12341 |0 |Cables1 |ASDF1231 |123451 |Start~>HInfo->Cables->Cables-11| +-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+ FinalMap : [Map(Scan_Time -> 123451, ServiceTag -> ASDF1231, Received_Time -> 23-12-2022, object_class_instance -> Cables-11, CablesHInfoID -> ABCD12341, Parent_Id -> 1234111, Item_Id -> -09091, CablesIndex -> 0, object_class -> Cables1, relation_tree -> Start~>HInfo->Cables->Cables-11, CablesName -> LC1, CablesStatus -> Installed1), Map(Scan_Time -> 12345, ServiceTag -> ASDF123, Received_Time -> 23-12-2020, object_class_instance -> Cables-1, CablesHInfoID -> ABCD1234, Parent_Id -> 1234, Item_Id -> -0909, CablesIndex -> 0, object_class -> Cables, relation_tree -> Start~>HInfo->Cables->Cables-1, CablesName -> LC, CablesStatus -> Installed)]

Similar problem was solved here.
© www.soinside.com 2019 - 2024. All rights reserved.