我正在研究一个用例,我需要在 Spark scala 中实现动态连接。 举例来说,我有一个基表
tableA
,并且想加入 tableB
和 tableC
。目前有两个表需要连接,但将来可能会出现这样的情况:我可能需要连接 10 个以上的表。显然,每次都更改代码是不可行的。
我研究了以下解决方案 -
def joinDataframes(baseDf: DataFrame,
dfList: List[(DataFrame, Seq[String], String)])
: DataFrame = {
dfList.foldLeft(baseDf) {
case (currentDf, (df, joinCols, joinType)) =>
currentDf.join(df, joinCols, joinType)
}
}
输入数据
val tableA = spark.createDataFrame(Seq(
(1, "John"),
(2, "Jane"),
(3, "Alice")
)).toDF("id", "name")
val tableB = spark.createDataFrame(Seq(
(1, "John", "New York"),
(2, "Jack", "Delhi"),
(3, "Ned", "Bengaluru")
)).toDF("id", "name", "city")
val tableC = spark.createDataFrame(Seq(
(1, "Accounting"),
(2, "Engineering"),
(3, "HR")
)).toDF("id", "department")
实施
val dfList = List(
(tableB, Seq("id", "name"), "left"),
(tableC, Seq("id"), "inner")
)
val joinedDf = joinDataframes(tableA, dfList)
从这里可以看出,如果表和连接列实质上增加,尤其是通过参数传递,则
dfList
的创建可能会变得复杂且不可扩展。此外,基表和查找表的列名称也不同。 我同意@eliasah提到的观点。您可以实施检查点来提高所创建的长沿袭的性能。
此外,我建议您将
List[(DataFrame, Seq[String], String)]
容器化到案例类中,以便具有可读性和类型安全性,因为编译器会捕获任何使用错误类型值的尝试。
case class JoinInfo(df: DataFrame, joinCols: Seq[String], joinType: String)
/**
* Join the dataframes based on the joinInfos provided and return the resulting dataframe
* @param baseDf The base dataframe to join with
* @param joinInfos The list of dataframes to join with the base dataframe
* @param shouldCheckpoint Whether to checkpoint the resulting dataframe
* @return The resulting dataframe after joining all the dataframes
*/
def joinDataframes(baseDf: DataFrame, joinInfos: List[JoinInfo], shouldCheckpoint: Boolean = false): DataFrame = {
joinInfos.foldLeft(baseDf) {
case (currentDf, JoinInfo(df, joinCols, joinType)) =>
val joinedDf = currentDf.join(df, joinCols, joinType)
if (shouldCheckpoint && spark.sparkContext.getCheckpointDir.isDefined) {
joinedDf.checkpoint()
} else {
joinedDf
}
}
}
这也是相对可扩展的,因为如果您想添加额外的参数,您可以通过修改案例类而不是调整元素来实现。
您现在可以在数据上使用它,如下所示:
val tableA = // data
val tableB = // data
val tableC = // data
val joinInfos = List(
JoinInfo(tableB, Seq("id", "name"), "inner"),
JoinInfo(tableC, Seq("id"), "inner")
)
val joinedDf = joinDataframes(tableA, joinInfos, shouldCheckpoint = true)
在这里,当检查点在会话中启用时,您将执行检查点。当对 RDD 或 DataFrame 执行
checkpoint
时,Spark 会将数据保存到磁盘上的检查点目录中,并使用此保存的版本进行将来的操作。这意味着下次对 RDD 或 DataFrame 执行操作时,Spark 不需要计算整个谱系,而只需计算检查点之后应用的转换。