Spark数据帧中的列值比较

问题描述 投票:1回答:2

我有一个包含大量记录的数据框。在该DF中,记录可以重复多次,每次更新时,最后更新的字段将具有修改记录的日期。

我们有一组列,我们想要比较类似id的行。在此比较期间,我们希望捕获从先前记录到当前记录的字段/列已更改的内容,并在更新记录的“updated_columns”列中捕获该字段/列。将此第二条记录与第三条记录进行比较并识别更新的列并在第三条记录的“updated_columns”字段中捕获该列,继续相同,直到该id的最后一条记录为每个具有多个条目的id执行相同的操作。

最初,我们对列进行分组,并从该组列中创建一个哈希值,并与下一行的哈希值进行比较,这样它可以帮助我识别具有更新的记录,但是需要更新的列。

在这里,我正在分享一些数据,这是预期的结果,这就是最终数据在添加更新列之后应该如何看待(这里我可以说,使用列Col1,Col2,Col3,col4和Col5进行两行之间的比较):

enter image description here

想要以有效的方式做到这一点。有人试过这样的事。

寻求帮助!

〜克里斯

apache-spark apache-spark-sql
2个回答
1
投票

可以使用window

我们的想法是按ID对数据进行分组,按LAST-UPDATED对其进行排序,将前一行(如果存在)的值复制到当前行中,然后将复制的数据与当前值进行比较。

val data = ... //the dataframe has the columns ID,Col1,Col2,Col3,Col4,Col5,LAST_UPDATED,IS_DELETED

val fieldNames = data.schema.fieldNames.dropRight(1) //1
val columns = fieldNames.map(f => col(f))
val windowspec = Window.partitionBy("ID").orderBy("LAST_UPDATED") //2
def compareArrayUdf() = ... //3

val result = data
  .withColumn("cur", array(columns: _*)) //4
  .withColumn("prev", lag($"cur", 1).over(windowspec)) //5
  .withColumn("updated_columns", compareArrayUdf()($"cur", $"prev")) //6
  .drop("cur", "prev") //7
  .orderBy("LAST_UPDATED")

备注:

  1. 创建要比较的所有字段的列表。使用除最后一个(最后更新)之外的所有字段
  2. 创建一个按ID分区的窗口,每个分区按LAST-UPDATED排序
  3. 创建一个比较两个数组并将发现的差异映射到字段名称的udf,代码见下文
  4. 创建一个包含应比较的所有值的新列
  5. 创建一个新列,其中包含应比较的上一行的所有值(通过使用lag函数)。前一行是具有相同ID且最大LAST-UPDATED小于当前行的行。该字段可以为null
  6. 比较两个新列并将结果放入更新列
  7. 删除在步骤3和4中创建的两个中间列

compareArraysUdf是

def compareArray(cur: mutable.WrappedArray[String], prev: mutable.WrappedArray[String]): String = {
  if (prev == null || cur == null) return ""
  val res = new StringBuilder
  for (i <- cur.indices) {
    if (!cur(i).contentEquals(prev(i))) {
      if (res.nonEmpty) res.append(",")
      res.append(fieldNames(i))
    }
  }
  res.toString()
}
def compareArrayUdf() = udf[String, mutable.WrappedArray[String], mutable.WrappedArray[String]](compareArray)

0
投票

您可以将DataFrame或DataSet连接到自身,连接两行中id相同的行,其中左行的版本为i,右行的版本为i+1。这是一个例子

case class T(id: String, version: Int, data: String)

val data = Seq(T("1", 1, "d1-1"), T("1", 2, "d1-2"), T("2", 1, "d2-1"), T("2", 2, "d2-2"), T("2", 3, "d2-3"), T("3", 1, "d3-1"))
data: Seq[T] = List(T(1,1,d1-1), T(1,2,d1-2), T(2,1,d2-1), T(2,2,d2-2), T(2,3,d2-3), T(3,1,d3-1))

val ds = data.toDS

val joined = ds.as("ds1").join(ds.as("ds2"), $"ds1.id" === $"ds2.id" && (($"ds1.version"+1) === $"ds2.version"))

然后你可以引用新的DataFrame / DataSet中的列,如$"ds1.data$"ds2.data等。

要查找数据从一个版本更改为另一个版本的行,您可以执行此操作

joined.filter($"ds1.data" !== $"ds2.data")
© www.soinside.com 2019 - 2024. All rights reserved.