[我正在尝试使用Apache Spark使用唯一列“ ID”将记录更新并插入到旧的Dataframe中。
def refreshUnion(oldDS: Dataset[_], newDS: Dataset[_], usingColumns: Seq[String],
useBroadcast: Boolean = false): Dataset[_] = {
val filteredNewDS = selectAndCastColumns(newDS, oldDS)
oldDS.join(
if (useBroadcast) broadcast(filteredNewDS) else filteredNewDS,
usingColumns,
"left_anti")
.select(oldDS.columns.map(columnName => col(columnName)): _*)
.union(filteredNewDS.toDF)
}
def selectAndCastColumns(ds: Dataset[_], refDS: Dataset[_]): Dataset[_] = {
val columns = ds.columns.toSet
ds.select(refDS.columns.map(c => {
if (!columns.contains(c)) {
lit(null).cast(refDS.schema(c).dataType) as c
} else {
ds(c).cast(refDS.schema(c).dataType) as c
}
}): _*)
}