如何在Apache Spark中执行UPSERT操作?

问题描述 投票:0回答:1

[我正在尝试使用Apache Spark使用唯一列“ ID”将记录更新并插入到旧的Dataframe中。

apache apache-spark
1个回答
0
投票
def refreshUnion(oldDS: Dataset[_], newDS: Dataset[_], usingColumns: Seq[String],
                   useBroadcast: Boolean = false): Dataset[_] = {
    val filteredNewDS = selectAndCastColumns(newDS, oldDS)
    oldDS.join(
      if (useBroadcast) broadcast(filteredNewDS) else filteredNewDS,
      usingColumns,
      "left_anti")
      .select(oldDS.columns.map(columnName => col(columnName)): _*)
      .union(filteredNewDS.toDF)
  }

  def selectAndCastColumns(ds: Dataset[_], refDS: Dataset[_]): Dataset[_] = {
    val columns = ds.columns.toSet
    ds.select(refDS.columns.map(c => {
      if (!columns.contains(c)) {
        lit(null).cast(refDS.schema(c).dataType) as c
      } else {
        ds(c).cast(refDS.schema(c).dataType) as c
      }
    }): _*)
  }
© www.soinside.com 2019 - 2024. All rights reserved.