假设我在 Scala Spark 中有以下 2 个数据帧:
保存所有已处理记录的数据框:
身份证 | 姓名 | 颜色 | 处理时间戳 | 另一个时间戳 |
---|---|---|---|---|
1 | 鲍勃 | 蓝色 | 171057948 | 171057948 |
2 | 爱丽丝 | 橙色 | 1711057948 | 1711057948 |
1 | 鲍勃 | 粉色 | 172057948 | 172057741 |
包含更新的数据框:
身份证 | 姓名 | 颜色 | 处理时间戳 | 另一个时间戳 |
---|---|---|---|---|
1 | 鲍勃 | 粉色 | 172058000 | 172058000 |
2 | 爱丽丝 | 蓝色 | 172058000 | 172058000 |
3 | 史黛西 | 红色 | 172058000 | 172058000 |
将更新合并到主数据框中后,我希望主数据框看起来像这样:
身份证 | 姓名 | 颜色 | 处理时间戳 | 另一个时间戳 |
---|---|---|---|---|
1 | 鲍勃 | 蓝色 | 171057948 | 171057948 |
2 | 爱丽丝 | 橙色 | 1711057948 | 1711057948 |
1 | 鲍勃 | 粉色 | 172058000 | 172058000 |
2 | 爱丽丝 | 蓝色 | 172058000 | 172058000 |
3 | 史黛西 | 红色 | 172058000 | 172058000 |
算法的文字含义是:
我是scala和spark新手,我怎样才能实现这个功能?理想情况下,函数签名看起来有点像这样:
import org.apache.spark.sql.DataFrame
def mergeDataFrames(mainDF: DataFrame, updateDF: DataFrame, mergeColumns: Seq[String], updateColumns: Seq[String]): DataFrame = {
//mergeColumns: ID, Name, Color
//updateColumns: ProcessingTimestamp, AnotherTimestamp
}
谢谢您的帮助。
至少有三种方法:
如果 ID、名称和颜色是您真正需要的唯一逻辑:
import sparkSession.implicits._
import org.apache.spark.sql.functions.{collect_list, struct}
val original = Seq(
(1, "Bob", "blue", 171057948, 171057948),
(2, "Alice", "orange", 1711057948, 1711057948),
(1, "Bob", "pink", 172057948,172057741),
).toDF("ID","Name","Color","ProcessingTimestamp","AnotherTimestamp").selectExpr("*","'o' source")
val updated = Seq(
(1, "Bob", "pink", 172058000, 172058000),
(2, "Alice", "blue", 172058000, 172058000),
(3, "Stacy", "red", 172058000,172058000),
).toDF("ID","Name","Color","ProcessingTimestamp","AnotherTimestamp").selectExpr("*","'u' source")
val grouped = original.union(updated).groupBy("ID","Name","Color").agg(collect_list(struct("ProcessingTimestamp","AnotherTimestamp", "source")).as("grouped"))
val filtered = grouped.selectExpr("ID","Name","Color", "element_at(if(array_size(grouped) = 1, grouped, filter(grouped, x -> x.source = 'u')),1) structy").
selectExpr("*","structy.*").drop("structy", "source")
filtered.show
即按关键字段联合分组并收集事实字段后,其中分组数据中的单行没有更新,其中多个更新(如果有多个更新,您可以使用像这样的聚合)下面有额外的逻辑)。
完全联接和一些基于字段的选择(基于更新源是否存在):
// full join then filter
val updatedNewNames = updated.selectExpr("ID","Name","Color","ProcessingTimestamp as UProcessingTimestamp","AnotherTimestamp as UAnotherTimestamp", "source")
val joinedAndFiltered =
original.drop("source").join(updatedNewNames, Seq("ID", "Name", "Color"), "full").
selectExpr("ID", "Name", "Color", "if(source is null, ProcessingTimestamp, UProcessingTimestamp) ProcessingTimestamp",
"if(source is null, AnotherTimestamp, UAnotherTimestamp) AnotherTimestamp")
joinedAndFiltered.show
如果你真的想努力或者有更严格的逻辑,你可以联合,收集列表并撒上“一些”转换。
val groupedForMap = original.union(updated).groupBy("ID").agg(collect_list(struct("Name","Color","ProcessingTimestamp","AnotherTimestamp", "source")).as("grouped"))
val mapped =
groupedForMap.selectExpr("id",
"""map_filter(
aggregate(grouped, map(named_struct('Name','','Color',''),named_struct('ProcessingTimestamp',0,'AnotherTimestamp',0, 'source', '-')),
(acc, x) ->
if(acc[struct(x.Name, x.Color)] is null,
-- we can just add to it with this awesome syntax
map_concat(acc, map(struct(x.Name, x.Color), struct(x.ProcessingTimestamp, x.AnotherTimestamp, x.source))),
if(acc[struct(x.Name, x.Color)].source = 'o' and x.source = 'u',
-- we need to replace with this even more convoluted syntax
map_concat( map_filter(acc, (k,v) -> k != struct(x.Name, x.Color)), map(struct(x.Name, x.Color), struct(x.ProcessingTimestamp, x.AnotherTimestamp, x.source))),
acc
)
)),
-- get rid of the starter
(k, v) -> k != named_struct('Name','','Color','')
) mapped
""")
val expanded = mapped.selectExpr("id","explode(mapped)").selectExpr("id","key.*","value.*")
expanded.show
所有收益(来自困难模式示例):
+---+-----+------+-------------------+----------------+------+
| id| Name| Color|ProcessingTimestamp|AnotherTimestamp|source|
+---+-----+------+-------------------+----------------+------+
| 1| Bob| blue| 171057948| 171057948| o|
| 1| Bob| pink| 172058000| 172058000| u|
| 2|Alice|orange| 1711057948| 1711057948| o|
| 2|Alice| blue| 172058000| 172058000| u|
| 3|Stacy| red| 172058000| 172058000| u|
+---+-----+------+-------------------+----------------+------+
需要额外的“源”字段来区分新旧。