我有具有以下架构的 Spark DataFrame,如下所示
|-- col1 : string
|-- col2 : string
|-- customer: struct
| |-- smt: string
| |-- attributes: array (nullable = true)
| | |-- element: struct
| | | |-- key: string
| | | |-- value: string
df:
col1 | col2 | 客户 |
---|---|---|
col1_XX | col2_XX | “属性”:[[{“键”:“AUS 1”,“值”:“56”},{“键”:“BS 1”,“值”:“45”}] |
我想要一个输出,其中仅当键以 A 开头时才会采用客户列的值。我无法在该代码行中放置正则表达式,如图所示。
df = df.withColumn('AUS',expr("filter(customer.attributes,x->x.key='AUS')")[0]["value"]
预期:
col1 | col2 | 客户 | 澳大利亚 |
---|---|---|---|
col1_XX | col2_XX | “属性”:[[{“键”:“AUS 1”,“值”:“56”},{“键”:“BS 1”,“值”:“45”}] | 56 |
得到:
col1 | col2 | 客户 | 澳大利亚 |
---|---|---|---|
col1_XX | col2_XX | “属性”:[[{“键”:“AUS 1”,“值”:“56”},{“键”:“BS 1”,“值”:“45”}] | 空 |
您可以在类似 SQL 的语句中使用
filter
函数。像这样的东西:
import pyspark.sql.functions as F
df.withColumn("customer", F.expr("filter(customer, x -> substring(x.key, 0, 1) = 'A')"))
这将仅过滤以“A”开头的那些。您也可以分解列并在之后应用过滤器:
import pyspark.sql.functions as F
df.withColumn("_map", F.explode("customer")).filter((F.col("_map").key.startswith("A")))
之后,您可以遍历键并根据需要创建列(如果事先不知道)。