我有一个数据框,它的模式看起来像这样
root
|-- category: string (nullable = true)
|-- signals: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- category string (nullable = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
我想在不爆炸的情况下操作以过滤/操作/转换
signals
结构数组。最好的方法是什么?
操作即过滤/操作/转换结构数组而不爆炸。一种好方法是使用 UDF 的 a.k.a 用户定义函数。 UDF 用于扩展框架的功能,并在多个 DataFrame 上重用这些功能。
转换:要添加一个新列,将数组传递给 UDF 并对其进行迭代而不分解它。
@udf(returnType=StringType())
def udf_signals_transformation(signal):
name = ""
for s in signal:
name = f"{name} {s.firstName} {s.lastName} - "
return name
tdf = sdf.withColumn("names", udf_signals_transformation(col("signals")))
display(tdf)
Filter:要根据某些嵌套条件过滤数据,将数组传递给 UDF 并对其进行迭代,检查条件并设置一个临时标志。最后使用 spark api 过滤标志。
@udf(returnType=BooleanType())
def udf_signals_filter(signal):
filter_cond = False
for s in signal:
if s.category == "condition":
filter_cond = True
return filter_cond
tdf = sdf.withColumn("tempfilter", udf_signals_filter(col("signals")))
tdf = tdf.where(col("tempfilter") == True).drop("tempfilter")
display(tdf)
对于数组类型的数据,可以考虑使用 Spark 的高阶函数。下面是几个例子:
case class Signal(category: String, firstName: String, lastName: String)
val df = Seq(
("c1", Seq(Signal("c1", "David", "O"), Signal("c1", "Michelle", "D"))),
("c2", Seq(Signal("c2", "John", "D"), Signal("c2", "Amy", "O"), Signal("c2", "Tom", "O")))
).toDF("category", "signals")
filter
:
df.withColumn("lastNameD", expr(
"filter(signals, x -> x.lastName = 'D')"
)).show(false)
// +--------+-------------------------------------------+-------------------+
// |category|signals |lastNameD |
// +--------+-------------------------------------------+-------------------+
// |c1 |[{c1, David, O}, {c1, Michelle, D}] |[{c1, Michelle, D}]|
// |c2 |[{c2, John, D}, {c2, Amy, O}, {c2, Tom, O}]|[{c2, John, D}] |
// +--------+-------------------------------------------+-------------------+
transform
:
df.withColumn("fullName", expr(
"transform(signals, x -> concat(x.category, ': ', x.firstName, ' ', x.lastName))"
)).show(false)
// +--------+-------------------------------------------+----------------------------------+
// |category|signals |fullName |
// +--------+-------------------------------------------+----------------------------------+
// |c1 |[{c1, David, O}, {c1, Michelle, D}] |[c1: David O, c1: Michelle D] |
// |c2 |[{c2, John, D}, {c2, Amy, O}, {c2, Tom, O}]|[c2: John D, c2: Amy O, c2: Tom O]|
// +--------+-------------------------------------------+----------------------------------+
aggregate
:
df.withColumn("lastNameCount", expr("""
aggregate(signals, cast(map() as map<string, integer>), (acc, x) ->
case when acc[x.lastName] is null then map_concat(acc, map(x.lastName, 1)) else
map_concat(map_filter(acc, (k, v) -> k != x.lastName), map(x.lastName, acc[x.lastName] + 1))
end)
""".stripMargin
)).show(false)
// +--------+-------------------------------------------+----------------+
// |category|signals |lastNameCount |
// +--------+-------------------------------------------+----------------+
// |c1 |[{c1, David, O}, {c1, Michelle, D}] |{O -> 1, D -> 1}|
// |c2 |[{c2, John, D}, {c2, Amy, O}, {c2, Tom, O}]|{D -> 1, O -> 2}|
// +--------+-------------------------------------------+----------------+
请注意,以上示例通过
expr
运行具有 SQL 语义的高阶函数。 Spark 3+ 允许用类似 Scala 的语法表达函数,然而,在处理结构数组的情况下似乎并不有效。
inline
和 groupBy/agg
(或 Window/partitionBy
)来扩展和处理结构数组,如下例所示:
df.
select(expr("inline(signals)")).
groupBy("category","lastName").agg(count()).
show
// +--------+--------+--------+
// |category|lastName|count(1)|
// +--------+--------+--------+
// | c1| D| 1|
// | c2| D| 1|
// | c2| O| 2|
// | c1| O| 1|
// +--------+--------+--------+