在 Scala Spark 3.1 中操作结构数组

问题描述 投票:0回答:2

我有一个数据框,它的模式看起来像这样

root
 |-- category: string (nullable = true)
 |-- signals: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- category string (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lastName: string (nullable = true)

我想在不爆炸的情况下操作以过滤/操作/转换

signals
结构数组。最好的方法是什么?

scala apache-spark
2个回答
0
投票

操作即过滤/操作/转换结构数组而不爆炸。一种好方法是使用 UDF 的 a.k.a 用户定义函数。 UDF 用于扩展框架的功能,并在多个 DataFrame 上重用这些功能。

转换:要添加一个新列,将数组传递给 UDF 并对其进行迭代而不分解它。

@udf(returnType=StringType()) 
def udf_signals_transformation(signal):
  name = ""
  for s in signal:
    name = f"{name} {s.firstName} {s.lastName} - "
 return name

tdf = sdf.withColumn("names", udf_signals_transformation(col("signals")))
display(tdf)

Filter:要根据某些嵌套条件过滤数据,将数组传递给 UDF 并对其进行迭代,检查条件并设置一个临时标志。最后使用 spark api 过滤标志。

@udf(returnType=BooleanType()) 
def udf_signals_filter(signal):
  filter_cond = False
  for s in signal:
    if s.category == "condition":
    filter_cond = True
  return filter_cond

tdf = sdf.withColumn("tempfilter", udf_signals_filter(col("signals")))
tdf = tdf.where(col("tempfilter") == True).drop("tempfilter")
display(tdf)

0
投票

对于数组类型的数据,可以考虑使用 Spark 的高阶函数。下面是几个例子:

case class Signal(category: String, firstName: String, lastName: String)

val df = Seq(
  ("c1", Seq(Signal("c1", "David", "O"), Signal("c1", "Michelle", "D"))),
  ("c2", Seq(Signal("c2", "John", "D"), Signal("c2", "Amy", "O"), Signal("c2", "Tom", "O")))
).toDF("category", "signals")

filter

df.withColumn("lastNameD", expr(
    "filter(signals, x -> x.lastName = 'D')"
  )).show(false)
// +--------+-------------------------------------------+-------------------+
// |category|signals                                    |lastNameD          |
// +--------+-------------------------------------------+-------------------+
// |c1      |[{c1, David, O}, {c1, Michelle, D}]        |[{c1, Michelle, D}]|
// |c2      |[{c2, John, D}, {c2, Amy, O}, {c2, Tom, O}]|[{c2, John, D}]    |
// +--------+-------------------------------------------+-------------------+

transform

df.withColumn("fullName", expr(
    "transform(signals, x -> concat(x.category, ': ', x.firstName, ' ', x.lastName))"
  )).show(false)
// +--------+-------------------------------------------+----------------------------------+
// |category|signals                                    |fullName                          |
// +--------+-------------------------------------------+----------------------------------+
// |c1      |[{c1, David, O}, {c1, Michelle, D}]        |[c1: David O, c1: Michelle D]     |
// |c2      |[{c2, John, D}, {c2, Amy, O}, {c2, Tom, O}]|[c2: John D, c2: Amy O, c2: Tom O]|
// +--------+-------------------------------------------+----------------------------------+

aggregate

df.withColumn("lastNameCount", expr("""
    aggregate(signals, cast(map() as map<string, integer>), (acc, x) -> 
      case when acc[x.lastName] is null then map_concat(acc, map(x.lastName, 1)) else
        map_concat(map_filter(acc, (k, v) -> k != x.lastName), map(x.lastName, acc[x.lastName] + 1))
      end)
  """.stripMargin
  )).show(false)
// +--------+-------------------------------------------+----------------+
// |category|signals                                    |lastNameCount   |
// +--------+-------------------------------------------+----------------+
// |c1      |[{c1, David, O}, {c1, Michelle, D}]        |{O -> 1, D -> 1}|
// |c2      |[{c2, John, D}, {c2, Amy, O}, {c2, Tom, O}]|{D -> 1, O -> 2}|
// +--------+-------------------------------------------+----------------+

请注意,以上示例通过

expr
运行具有 SQL 语义的高阶函数。 Spark 3+ 允许用类似 Scala 的语法表达函数,然而,在处理结构数组的情况下似乎并不有效。


另一种方法是使用

inline
groupBy/agg
(或
Window/partitionBy
)来扩展和处理结构数组,如下例所示:

df.
  select(expr("inline(signals)")).
  groupBy("category","lastName").agg(count()).
  show
// +--------+--------+--------+
// |category|lastName|count(1)|
// +--------+--------+--------+
// |      c1|       D|       1|
// |      c2|       D|       1|
// |      c2|       O|       2|
// |      c1|       O|       1|
// +--------+--------+--------+
© www.soinside.com 2019 - 2024. All rights reserved.