根据给定的操作列创建一个新的数据集。

问题描述 投票:0回答:1

我使用的是spark-sql-2.3.1v,有以下情况。

给定一个数据集。

val ds = Seq(
  (1, "x1", "y1", "0.1992019"),
  (2, null, "y2", "2.2500000"),
  (3, "x3", null, "15.34567"),
  (4, null, "y4", null),
  (5, "x4", "y4", "0")
   ).toDF("id","col_x", "col_y","value")

+---+-----+-----+---------+
| id|col_x|col_y|    value|
+---+-----+-----+---------+
|  1|   x1|   y1|0.1992019|
|  2| null|   y2|2.2500000|
|  3|   x3| null| 15.34567|
|  4| null|   y4|     null|
|  5|   x4|   y4|        0|
+---+-----+-----+---------+

要求。

我得到一个操作列(即。operationCol),我需要从外部对其进行一些计算。

当对列 "col_x "进行一些操作时,我需要通过过滤掉所有 "col_x "为空值的记录来创建一个新的数据集,并返回该新数据集。

同样,当对col_y列进行操作时,我需要过滤掉所有 "col_y "为空值的记录,创建一个新的数据集,并返回该新数据集。

例如:当操作Col=="col_y "时,我需要创建一个新的数据集,过滤掉所有 "col_y "为空值的记录,并返回新的数据集。

val operationCol ="col_x";

if(operationCol === "col_x"){
  //filter out all rows which has "col_x" null and return that new dataset.
}

if(operationCol === "col_y"){
  //filter out all rows which has "col_y" null and return that new dataset.
}

当操作Col === "col_x "时,预期输出:

+---+-----+-----+---------+
| id|col_x|col_y|    value|
+---+-----+-----+---------+
|  1|   x1|   y1|0.1992019|
|  3|   x3| null| 15.34567|
|  5|   x4|   y4|        0|
+---+-----+-----+---------+

当operationCol === "col_y" 预期输出:

+---+-----+-----+---------+
| id|col_x|col_y|    value|
+---+-----+-----+---------+
|  1|   x1|   y1|0.1992019|
|  2| null|   y2|2.2500000|
|  4| null|   y4|     null|
|  5|   x4|   y4|        0|
+---+-----+-----+---------+

如何实现这个预期输出?换句话说,如何完成数据框的分支?如何在流程中间创建一个新的dataframedataset?

apache-spark apache-spark-sql spark-streaming
1个回答
1
投票

您可以使用 df.na.drop() 来删除包含空值的行。drop函数可以接受一个你想考虑的列的列表作为输入,所以在这种情况下,你可以把它写成如下。

val newDf = df.na.drop(Seq(operationCol))

这将创建一个新的数据框 newDf 中的所有行,其中 operationCol 已被删除。


1
投票

您也可以使用 filter 以过滤出空值。


scala> val operationCol = "col_x" // for one column
operationCol: String = col_x

scala> ds.filter(col(operationCol).isNotNull).show(false)
+---+-----+-----+---------+
|id |col_x|col_y|value    |
+---+-----+-----+---------+
|1  |x1   |y1   |0.1992019|
|3  |x3   |null |15.34567 |
|5  |x4   |y4   |0        |
+---+-----+-----+---------+


scala> val operationCol = Seq("col_x","col_y") // For multiple Columns
operationCol: Seq[String] = List(col_x, col_y)

scala> ds.filter(operationCol.map(col(_).isNotNull).reduce(_ && _)).show
+---+-----+-----+---------+
| id|col_x|col_y|    value|
+---+-----+-----+---------+
|  1|   x1|   y1|0.1992019|
|  5|   x4|   y4|        0|
+---+-----+-----+---------+

© www.soinside.com 2019 - 2024. All rights reserved.