避免在nullSafeJoin后出现重复的coulms

问题描述 投票:0回答:1

我有一个用例,我需要加入 无效 列。我也是这样做的。

  def nullSafeJoin(leftDF: DataFrame, rightDF: DataFrame, joinOnColumns: Seq[String]) = {

    val dataset1 = leftDF.alias("dataset1")
    val dataset2 = rightDF.alias("dataset2")

    val firstColumn = joinOnColumns.head
    val colExpression: Column = (col(s"dataset1.$firstColumn").eqNullSafe(col(s"dataset2.$firstColumn")))

    val fullExpr = joinOnColumns.tail.foldLeft(colExpression) {
      (colExpression, p) => colExpression && (col(s"dataset1.$p").eqNullSafe(col(s"dataset2.$p")))
    }
    dataset1.join(dataset2, fullExpr)
  }

最后加入的数据集有重复的列。我已经试过使用别名来删除列,像这样。

dataset1.join(dataset2, fullExpr).drop(s"dataset2.$firstColumn")

但它不工作。我知道我们可以用选择列来代替删除。

我试图有一个通用的代码基础,所以不想把要选择的列的列表传递给函数(如果是drop,我将不得不直接把列表中的 加入列上 我们已经传递给函数)

如果有任何关于如何解决这个问题的指点,真的会很有帮助.谢谢!

编辑 : (样本数据)

leftDF :
+------------------+-----------+---------+---------+-------+
|                 A|          B|        C|        D| status|
+------------------+-----------+---------+---------+-------+
|             14567|         37|        1|     game|Enabled|
|             14567|       BASE|        1|      toy| Paused|
|             13478|       null|        5|     game|Enabled|
|              2001|       BASE|        1|     null| Paused|
|              null|         37|        1|     home|Enabled|
+------------------+-----------+---------+---------+-------+

rightDF :
+------------------+-----------+---------+
|                 A|          B|        C|
+------------------+-----------+---------+
|               140|         37|        1|
|               569|       BASE|        1|
|             13478|       null|        5|
|              2001|       BASE|        1|
|              null|         37|        1|
+------------------+-----------+---------+

Final Join (Required):
+------------------+-----------+---------+---------+-------+
|                 A|          B|        C|        D| status|
+------------------+-----------+---------+---------+-------+
|             13478|       null|        5|     game|Enabled|
|              2001|       BASE|        1|     null| Paused|
|              null|         37|        1|     home|Enabled|
+------------------+-----------+---------+---------+-------+
scala apache-spark apache-spark-dataset
1个回答
0
投票

你最后的DataFrame有重复的列,来自左DF & 右DF,没有标识符来检查该列是来自左DF还是右DF。

所以我把左DF & 右DF的列重命名了。left_[column_name] & 右侧DF列以 right_[column_name]

希望下面的代码能帮助你。

scala> :paste
// Entering paste mode (ctrl-D to finish)

  val left = Seq(("14567", "37", "1", "game", "Enabled"), ("14567", "BASE", "1", "toy", "Paused"), ("13478", "null", "5", "game", "Enabled"), ("2001", "BASE", "1", "null", "Paused"), ("null", "37", "1", "home", "Enabled")).toDF("a", "b", "c", "d", "status")
  val right = Seq(("140", "37", 1), ("569", "BASE", 1), ("13478", "null", 5), ("2001", "BASE", 1), ("null", "37", 1)).toDF("a", "b", "c")

  import org.apache.spark.sql.DataFrame
  def nullSafeJoin(leftDF: DataFrame, rightDF: DataFrame, joinOnColumns: Seq[String]):DataFrame = {
    val leftRenamedDF = leftDF
      .columns
      .map(c => (c, s"left_${c}"))
      .foldLeft(leftDF){ (df, c) =>
        df.withColumnRenamed(c._1, c._2)
      }
    val rightRenamedDF = rightDF
      .columns
      .map(c => (c, s"right_${c}"))
      .foldLeft(rightDF){(df, c) =>
        df.withColumnRenamed(c._1, c._2)
      }

    val fullExpr = joinOnColumns
      .tail
    .foldLeft($"left_${joinOnColumns.head}".eqNullSafe($"right_${joinOnColumns.head}")){(cee, p) =>
        cee && ($"left_${p}".eqNullSafe($"right_${p}"))
      }

    val finalColumns = joinOnColumns
      .map(c => col(s"left_${c}").as(c)) ++ // Taking All columns from Join columns
      leftDF.columns.diff(joinOnColumns).map(c => col(s"left_${c}").as(c)) ++ // Taking missing columns from leftDF
      rightDF.columns.diff(joinOnColumns).map(c => col(s"right_${c}").as(c)) // Taking missing columns from rightDF

    leftRenamedDF.join(rightRenamedDF, fullExpr).select(finalColumns: _*)
  }

scala>

最终的DataFrame结果是.NET。

scala> nullSafeJoin(left, right, Seq("a", "b", "c")).show(false)


// Exiting paste mode, now interpreting.

+-----+----+---+----+-------+
|a    |b   |c  |d   |status |
+-----+----+---+----+-------+
|13478|null|5  |game|Enabled|
|2001 |BASE|1  |null|Paused |
|null |37  |1  |home|Enabled|
+-----+----+---+----+-------+

© www.soinside.com 2019 - 2024. All rights reserved.