我有一个包含大量记录的数据框。在该DF中,记录可以重复多次,每次更新时,最后更新的字段将具有修改记录的日期。
我们有一组列,我们想要比较类似id的行。在此比较期间,我们希望捕获从先前记录到当前记录的字段/列已更改的内容,并在更新记录的“updated_columns”列中捕获该字段/列。将此第二条记录与第三条记录进行比较并识别更新的列并在第三条记录的“updated_columns”字段中捕获该列,继续相同,直到该id的最后一条记录为每个具有多个条目的id执行相同的操作。
最初,我们对列进行分组,并从该组列中创建一个哈希值,并与下一行的哈希值进行比较,这样它可以帮助我识别具有更新的记录,但是需要更新的列。
在这里,我正在分享一些数据,这是预期的结果,这就是最终数据在添加更新列之后应该如何看待(这里我可以说,使用列Col1,Col2,Col3,col4和Col5进行两行之间的比较):
想要以有效的方式做到这一点。有人试过这样的事。
寻求帮助!
〜克里斯
可以使用window。
我们的想法是按ID对数据进行分组,按LAST-UPDATED对其进行排序,将前一行(如果存在)的值复制到当前行中,然后将复制的数据与当前值进行比较。
val data = ... //the dataframe has the columns ID,Col1,Col2,Col3,Col4,Col5,LAST_UPDATED,IS_DELETED
val fieldNames = data.schema.fieldNames.dropRight(1) //1
val columns = fieldNames.map(f => col(f))
val windowspec = Window.partitionBy("ID").orderBy("LAST_UPDATED") //2
def compareArrayUdf() = ... //3
val result = data
.withColumn("cur", array(columns: _*)) //4
.withColumn("prev", lag($"cur", 1).over(windowspec)) //5
.withColumn("updated_columns", compareArrayUdf()($"cur", $"prev")) //6
.drop("cur", "prev") //7
.orderBy("LAST_UPDATED")
备注:
compareArraysUdf是
def compareArray(cur: mutable.WrappedArray[String], prev: mutable.WrappedArray[String]): String = {
if (prev == null || cur == null) return ""
val res = new StringBuilder
for (i <- cur.indices) {
if (!cur(i).contentEquals(prev(i))) {
if (res.nonEmpty) res.append(",")
res.append(fieldNames(i))
}
}
res.toString()
}
def compareArrayUdf() = udf[String, mutable.WrappedArray[String], mutable.WrappedArray[String]](compareArray)
您可以将DataFrame或DataSet连接到自身,连接两行中id相同的行,其中左行的版本为i
,右行的版本为i+1
。这是一个例子
case class T(id: String, version: Int, data: String)
val data = Seq(T("1", 1, "d1-1"), T("1", 2, "d1-2"), T("2", 1, "d2-1"), T("2", 2, "d2-2"), T("2", 3, "d2-3"), T("3", 1, "d3-1"))
data: Seq[T] = List(T(1,1,d1-1), T(1,2,d1-2), T(2,1,d2-1), T(2,2,d2-2), T(2,3,d2-3), T(3,1,d3-1))
val ds = data.toDS
val joined = ds.as("ds1").join(ds.as("ds2"), $"ds1.id" === $"ds2.id" && (($"ds1.version"+1) === $"ds2.version"))
然后你可以引用新的DataFrame / DataSet中的列,如$"ds1.data
和$"ds2.data
等。
要查找数据从一个版本更改为另一个版本的行,您可以执行此操作
joined.filter($"ds1.data" !== $"ds2.data")