在 Spark Scala 中实现动态连接

问题描述 投票:0回答:1

我正在研究一个用例,我需要在 Spark scala 中实现动态连接。 举例来说,我有一个基表

tableA
,并且想加入
tableB
tableC
。目前有两个表需要连接,但将来可能会出现这样的情况:我可能需要连接 10 个以上的表。显然,每次都更改代码是不可行的。 我研究了以下解决方案 -

def joinDataframes(baseDf: DataFrame, 
                   dfList: List[(DataFrame, Seq[String], String)]) 
                  : DataFrame = {

  dfList.foldLeft(baseDf) {
    case (currentDf, (df, joinCols, joinType)) => 
      currentDf.join(df, joinCols, joinType)
  }
}

输入数据

val tableA = spark.createDataFrame(Seq(
      (1, "John"),
      (2, "Jane"),
      (3, "Alice")
    )).toDF("id", "name")

val tableB = spark.createDataFrame(Seq(
      (1, "John", "New York"),
      (2, "Jack", "Delhi"),
      (3, "Ned", "Bengaluru")
    )).toDF("id", "name", "city")

val tableC = spark.createDataFrame(Seq(
      (1, "Accounting"),
      (2, "Engineering"),
      (3, "HR")
    )).toDF("id", "department")

实施

val dfList = List(
  (tableB, Seq("id", "name"), "left"),
  (tableC, Seq("id"), "inner")  
)

val joinedDf = joinDataframes(tableA, dfList)

从这里可以看出,如果表和连接列实质上增加,尤其是通过参数传递,则

dfList
的创建可能会变得复杂且不可扩展。此外,基表和查找表的列名称也不同。
有没有更好的方法来处理这个用例?

scala apache-spark
1个回答
0
投票

我同意@eliasah提到的观点。您可以实施检查点来提高所创建的长沿袭的性能。

此外,我建议您将

List[(DataFrame, Seq[String], String)]
容器化到案例类中,以便具有可读性和类型安全性,因为编译器会捕获任何使用错误类型值的尝试。

case class JoinInfo(df: DataFrame, joinCols: Seq[String], joinType: String)

/**
 * Join the dataframes based on the joinInfos provided and return the resulting dataframe
 * @param baseDf The base dataframe to join with
 * @param joinInfos The list of dataframes to join with the base dataframe
 * @param shouldCheckpoint Whether to checkpoint the resulting dataframe
 * @return  The resulting dataframe after joining all the dataframes
 */
def joinDataframes(baseDf: DataFrame, joinInfos: List[JoinInfo], shouldCheckpoint: Boolean = false): DataFrame = {
  joinInfos.foldLeft(baseDf) {
    case (currentDf, JoinInfo(df, joinCols, joinType)) =>
      val joinedDf = currentDf.join(df, joinCols, joinType)
      if (shouldCheckpoint && spark.sparkContext.getCheckpointDir.isDefined) {
        joinedDf.checkpoint()
      } else {
        joinedDf
      }
  }
}

这也是相对可扩展的,因为如果您想添加额外的参数,您可以通过修改案例类而不是调整元素来实现。

您现在可以在数据上使用它,如下所示:

val tableA = // data
val tableB = // data
val tableC = // data

val joinInfos = List(
  JoinInfo(tableB, Seq("id", "name"), "inner"),
  JoinInfo(tableC, Seq("id"), "inner")
)

val joinedDf = joinDataframes(tableA, joinInfos, shouldCheckpoint = true)

在这里,当检查点在会话中启用时,您将执行检查点。当对 RDD 或 DataFrame 执行

checkpoint
时,Spark 会将数据保存到磁盘上的检查点目录中,并使用此保存的版本进行将来的操作。这意味着下次对 RDD 或 DataFrame 执行操作时,Spark 不需要计算整个谱系,而只需计算检查点之后应用的转换。

© www.soinside.com 2019 - 2024. All rights reserved.