在Spark SQL数据框中压缩并分解多列

问题描述 投票:1回答:2

我具有以下结构的数据框:

A: Array[String]   | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D]       | [1, 2, 3, 4]     | [ ... array with 4 elements ... ]
[E, F, G, H, I]    | [5, 6, 7, 8, 9]  | [ ... array with 5 elements ... ]
[J]                | [10]             | [ ... array with 1 element ...  ]

我想写一个UDF,那

  1. 在DF中每列的第i个位置上压缩元素
  2. 在每个压缩的元组上分解DF

结果列应如下所示:

ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]

目前,我正在对这样的UDF使用多次调用(每个列名称一个,列名称列表在运行时之前被收集):

val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
  xa.indices.map(i => {
    xa(i) :+ xb(i) // Add one element to the zip column
  })
})

val allColumnNames = df.columns.filter(...)    

for (columnName <- allColumnNames) {
  df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")

由于数据帧可以具有数百列,因此withColumn的此迭代调用似乎需要很长时间。

问题:这可能与一个UDF和一个DF.withColumn(...)调用有关吗?

重要

:UDF应该压缩动态列数(在运行时读取)。

我具有以下结构的数据框:A:Array [String] | B:数组[String] | [...多其他列...] ======================================== ================================== [A,...

apache-spark apache-spark-sql user-defined-functions apache-spark-dataset array-explode
2个回答
0
投票

如果您知道并确定数组中的值数量,则可以使用以下更简单的解决方案之一


0
投票

使用UDF,它使用可变列数作为输入。这可以通过数组数组来完成(假设类型相同)。由于您有一个数组数组,因此可以使用transpose,其效果与将列表压缩在一起的效果相同。然后可以分解生成的数组。

© www.soinside.com 2019 - 2024. All rights reserved.