使用星火数据帧多列一排的变化值

问题描述 投票:-4回答:1

我得到这个格式的数据帧(DF)。

df.show()
********************
X1 | x2  | X3 | ..... | Xn   | id_1 | id_2 | .... id_23
1  |  ok |good|         john | null | null |     |null
2  |rick |good|       | ryan | null | null |     |null
....

我有一个数据帧中,我有很多的列和数据框被命名为DF。我需要在这个数据帧(DF)编辑列。我有2个地图,M1(整型>整数)和m2(整型>字符串)映射。

我需要看每个行和采取列X1值,看看X1的M1中的映射值,这将是在范围[1,23],让它成为5,并且还发现在平方米这将X1的映射值是这样的X8。我需要添加在列X8价值ID_5。我有以下的代码,但我不能得到这个工作。

val dfEdited = df.map( (row) => {
  val mapValue = row.getAs("X1")
  row.getAs("id_"+m1.get(mapValue)) = row.getAs(m2.get(mapValue)
})
scala apache-spark apache-spark-sql apache-spark-dataset
1个回答
2
投票

你在做什么在row.getAs("id_"+m1.get(mapValue)) = row.getAs(m2.get(mapValue)没有意义。

首先,你分配一个值来操作getAs("id_"+m1.get(mapValue)),它给你一个不变的值的结果。其次,因为你需要指定由该方法返回的数据类型不使用正确的方法getAs

我不知道是否我的理解正确,你想做什么,我猜你缺少一些细节。不管怎么说,这里是我得到了什么,它工作正常。

当然,我这样评价每一行代码,这样就可以很容易地理解它。

// First of all we need to create a case class to wrap the content of each row.
case class Schema(X1: Int, X2: String, X3: String, X4: String, id_1: Option[String], id_2: Option[String], id_3: Option[String])


val dfEdited = ds.map( row => {
  // We use the getInt method to get the value of a field which is expected to be Int
  val mapValue = row.getInt(row.fieldIndex("X1"))

  // fieldIndex gives you the position inside the row fo the field you are looking for. 
  // Regarding m1(mapValue), NullPointer might be thrown if mapValue is not in that Map. 
  // You need to implement mechanisms to deal with it (for example, an if...else clause, or using the method getOrElse)
  val indexToModify = row.fieldIndex("id_" + m1(mapValue)) 

  // We convert the row to a sequence, and pair each element with its index.
  // Then, with the map method we generate a new sequence.
  // We replace the element situated in the position indexToModify.
  // In addition, if there are null values, we have to convert it to an object of type Option.
  // It is necessary for the next step.
  val seq = row.toSeq.zipWithIndex.map(x => if (x._2 == indexToModify) Some(m2(mapValue)) else if(x._1 == null) None else x._1)


  // Finally, you have to create the Schema object by using pattern matching
  seq match {
    case Seq(x1: Int, x2: String, x3: String, x4: String, id_1: Option[String], id_2: Option[String], id_3: Option[String]) => Schema(x1, x2,x3,x4, id_1, id_2, id_3)
  }
})

一些评论:

  • 所述ds对象是一个数据集。数据集必须有一个结构。您不能修改行的地图方法内,返回他们,因为星火不会知道,如果数据集的结构发生了变化。出于这个原因,我回国的情况下类对象,因为它提供的数据集对象的结构。
  • 请记住,你可能有空值的问题。此代码可能会引发你的空指针,如果你不建立机制处理案件中,例如,X1的值不在M1。

希望它的工作原理。

© www.soinside.com 2019 - 2024. All rights reserved.