我使用的是spark-sql-2.3.1v,有以下情况。
给定一个数据集。
val ds = Seq(
(1, "x1", "y1", "0.1992019"),
(2, null, "y2", "2.2500000"),
(3, "x3", null, "15.34567"),
(4, null, "y4", null),
(5, "x4", "y4", "0")
).toDF("id","col_x", "col_y","value")
即
+---+-----+-----+---------+
| id|col_x|col_y| value|
+---+-----+-----+---------+
| 1| x1| y1|0.1992019|
| 2| null| y2|2.2500000|
| 3| x3| null| 15.34567|
| 4| null| y4| null|
| 5| x4| y4| 0|
+---+-----+-----+---------+
要求。
我得到一个操作列(即。operationCol
),我需要从外部对其进行一些计算。
当对列 "col_x "进行一些操作时,我需要通过过滤掉所有 "col_x "为空值的记录来创建一个新的数据集,并返回该新数据集。
同样,当对col_y列进行操作时,我需要过滤掉所有 "col_y "为空值的记录,创建一个新的数据集,并返回该新数据集。
例如:当操作Col=="col_y "时,我需要创建一个新的数据集,过滤掉所有 "col_y "为空值的记录,并返回新的数据集。
val operationCol ="col_x";
if(operationCol === "col_x"){
//filter out all rows which has "col_x" null and return that new dataset.
}
if(operationCol === "col_y"){
//filter out all rows which has "col_y" null and return that new dataset.
}
当操作Col === "col_x "时,预期输出:
+---+-----+-----+---------+
| id|col_x|col_y| value|
+---+-----+-----+---------+
| 1| x1| y1|0.1992019|
| 3| x3| null| 15.34567|
| 5| x4| y4| 0|
+---+-----+-----+---------+
当operationCol === "col_y" 预期输出:
+---+-----+-----+---------+
| id|col_x|col_y| value|
+---+-----+-----+---------+
| 1| x1| y1|0.1992019|
| 2| null| y2|2.2500000|
| 4| null| y4| null|
| 5| x4| y4| 0|
+---+-----+-----+---------+
如何实现这个预期输出?换句话说,如何完成数据框的分支?如何在流程中间创建一个新的dataframedataset?
您可以使用 df.na.drop()
来删除包含空值的行。drop函数可以接受一个你想考虑的列的列表作为输入,所以在这种情况下,你可以把它写成如下。
val newDf = df.na.drop(Seq(operationCol))
这将创建一个新的数据框 newDf
中的所有行,其中 operationCol
已被删除。
您也可以使用 filter
以过滤出空值。
scala> val operationCol = "col_x" // for one column
operationCol: String = col_x
scala> ds.filter(col(operationCol).isNotNull).show(false)
+---+-----+-----+---------+
|id |col_x|col_y|value |
+---+-----+-----+---------+
|1 |x1 |y1 |0.1992019|
|3 |x3 |null |15.34567 |
|5 |x4 |y4 |0 |
+---+-----+-----+---------+
scala> val operationCol = Seq("col_x","col_y") // For multiple Columns
operationCol: Seq[String] = List(col_x, col_y)
scala> ds.filter(operationCol.map(col(_).isNotNull).reduce(_ && _)).show
+---+-----+-----+---------+
| id|col_x|col_y| value|
+---+-----+-----+---------+
| 1| x1| y1|0.1992019|
| 5| x4| y4| 0|
+---+-----+-----+---------+