使用 Scala Spark 合并特定的数据帧

问题描述 投票:0回答:1

假设我在 Scala Spark 中有以下 2 个数据帧:

保存所有已处理记录的数据框:

身份证 姓名 颜色 处理时间戳 另一个时间戳
1 鲍勃 蓝色 171057948 171057948
2 爱丽丝 橙色 1711057948 1711057948
1 鲍勃 粉色 172057948 172057741

包含更新的数据框:

身份证 姓名 颜色 处理时间戳 另一个时间戳
1 鲍勃 粉色 172058000 172058000
2 爱丽丝 蓝色 172058000 172058000
3 史黛西 红色 172058000 172058000

将更新合并到主数据框中后,我希望主数据框看起来像这样:

身份证 姓名 颜色 处理时间戳 另一个时间戳
1 鲍勃 蓝色 171057948 171057948
2 爱丽丝 橙色 1711057948 1711057948
1 鲍勃 粉色 172058000 172058000
2 爱丽丝 蓝色 172058000 172058000
3 史黛西 红色 172058000 172058000

算法的文字含义是:

  • 如果更新数据帧中的一行已存在于主数据帧中,其中除时间戳列之外的列值相同,则无需向主数据帧添加新行,只需更新现有匹配行中的时间戳值即可。
  • 如果存在具有相同 ID 的行,但更新了某些属性(本例中的“名称”和“颜色”),请插入具有相同 ID 和更新数据帧中的时间戳的全新行
  • 如果主数据框中不存在ID,只需按原样插入(与第二种情况相同)

我是scala和spark新手,我怎样才能实现这个功能?理想情况下,函数签名看起来有点像这样:

import org.apache.spark.sql.DataFrame

def mergeDataFrames(mainDF: DataFrame, updateDF: DataFrame, mergeColumns: Seq[String], updateColumns: Seq[String]): DataFrame = {
//mergeColumns: ID, Name, Color
//updateColumns: ProcessingTimestamp, AnotherTimestamp
}

谢谢您的帮助。

scala apache-spark
1个回答
0
投票

至少有三种方法:

按所有键分组

如果 ID、名称和颜色是您真正需要的唯一逻辑:

import sparkSession.implicits._
import org.apache.spark.sql.functions.{collect_list, struct}
val original = Seq(
  (1,   "Bob",  "blue", 171057948,  171057948),
  (2,   "Alice",    "orange",   1711057948, 1711057948),
  (1,   "Bob", "pink",  172057948,172057741),
).toDF("ID","Name","Color","ProcessingTimestamp","AnotherTimestamp").selectExpr("*","'o' source")

val updated = Seq(
  (1,   "Bob",  "pink", 172058000,  172058000),
  (2,   "Alice",    "blue", 172058000,  172058000),
  (3,   "Stacy", "red", 172058000,172058000),
).toDF("ID","Name","Color","ProcessingTimestamp","AnotherTimestamp").selectExpr("*","'u' source")

val grouped = original.union(updated).groupBy("ID","Name","Color").agg(collect_list(struct("ProcessingTimestamp","AnotherTimestamp", "source")).as("grouped"))
val filtered = grouped.selectExpr("ID","Name","Color", "element_at(if(array_size(grouped) = 1, grouped, filter(grouped, x -> x.source = 'u')),1) structy").
  selectExpr("*","structy.*").drop("structy", "source")
filtered.show

即按关键字段联合分组并收集事实字段后,其中分组数据中的单行没有更新,其中多个更新(如果有多个更新,您可以使用像这样的聚合)下面有额外的逻辑)。

逻辑与OP描述的完全一样

完全联接和一些基于字段的选择(基于更新源是否存在):

// full join then filter
val updatedNewNames = updated.selectExpr("ID","Name","Color","ProcessingTimestamp as UProcessingTimestamp","AnotherTimestamp as UAnotherTimestamp", "source")
val joinedAndFiltered =
  original.drop("source").join(updatedNewNames, Seq("ID", "Name", "Color"), "full").
    selectExpr("ID", "Name", "Color", "if(source is null, ProcessingTimestamp, UProcessingTimestamp) ProcessingTimestamp",
      "if(source is null, AnotherTimestamp, UAnotherTimestamp) AnotherTimestamp")
joinedAndFiltered.show

我真的需要更复杂的逻辑

如果你真的想努力或者有更严格的逻辑,你可以联合,收集列表并撒上“一些”转换。

val groupedForMap = original.union(updated).groupBy("ID").agg(collect_list(struct("Name","Color","ProcessingTimestamp","AnotherTimestamp", "source")).as("grouped"))

val mapped =
  groupedForMap.selectExpr("id",
  """map_filter(
   aggregate(grouped, map(named_struct('Name','','Color',''),named_struct('ProcessingTimestamp',0,'AnotherTimestamp',0, 'source', '-')),
  (acc, x) ->
    if(acc[struct(x.Name, x.Color)] is null,
      -- we can just add to it with this awesome syntax
      map_concat(acc, map(struct(x.Name, x.Color), struct(x.ProcessingTimestamp, x.AnotherTimestamp, x.source))),
      if(acc[struct(x.Name, x.Color)].source = 'o' and x.source = 'u',
        -- we need to replace with this even more convoluted syntax
        map_concat( map_filter(acc, (k,v) -> k != struct(x.Name, x.Color)), map(struct(x.Name, x.Color), struct(x.ProcessingTimestamp, x.AnotherTimestamp, x.source))),
        acc
      )
      )),
      -- get rid of the starter
      (k, v) -> k != named_struct('Name','','Color','')
       ) mapped
      """)

val expanded = mapped.selectExpr("id","explode(mapped)").selectExpr("id","key.*","value.*")
expanded.show

所有收益(来自困难模式示例):

+---+-----+------+-------------------+----------------+------+
| id| Name| Color|ProcessingTimestamp|AnotherTimestamp|source|
+---+-----+------+-------------------+----------------+------+
|  1|  Bob|  blue|          171057948|       171057948|     o|
|  1|  Bob|  pink|          172058000|       172058000|     u|
|  2|Alice|orange|         1711057948|      1711057948|     o|
|  2|Alice|  blue|          172058000|       172058000|     u|
|  3|Stacy|   red|          172058000|       172058000|     u|
+---+-----+------+-------------------+----------------+------+

需要额外的“源”字段来区分新旧。

© www.soinside.com 2019 - 2024. All rights reserved.