将URI查询字符串转换为PySpark中的结构键值数组

问题描述 投票:0回答:1

我在PySpark中有一个DataFrame,像这样的一列URI查询字符串(StringType):

+--------------+ 
| cs_uri_query |
+--------------+
| a=1&b=2&c=3  |
+--------------+
| d&e=&f=4     |
+--------------+

我需要在具有以下结构的StructField元素的ArrayType中转换此列:

ArrayType(StructType([StructField('key', StringType(), nullable=False),
                      StructField('value', StringType(), nullable=True)]))

我的预期专栏是这样的:

+------------------------------------------------------------+ 
| cs_uri_query                                               |
+------------------------------------------------------------+
| [{key=a, value=1},{key=b, value=2},{key=c, value=3}]       |
+------------------------------------------------------------+
| [{key=d, value=null},{key=e, value=null},{key=f, value=4}] |
+------------------------------------------------------------+

UDF是我发现实现此目标的唯一方法。我使用的是纯Spark函数,如果可能,我想避免使用UDF ...与在Scala lang上使用Spark不同,UDF在PySpark上的性能非常差。

这是我使用UDF的代码:

def parse_query(query):
    args = None
    if query:
        args = []
        for arg in query.split("&"):
            if arg:
                if "=" in arg:
                    a = arg.split("=")
                    if a[0]:
                        v = a[1] if a[1] else None
                        args.append({"key": a[0], "value": v})
                else:
                    args.append({"key": arg, "value": None})
    return args

uri_query = ArrayType(StructType([StructField('key', StringType(), nullable=True),
                                  StructField('value', StringType(), nullable=True)]))

udf_parse_query = udf(lambda args: parse_query(args), uri_query)

df = df.withColumn("cs_uri_query", udf_parse_query(df["cs_uri_query"]))

有人能够以惊人的解决方案让我睁开眼睛吗?

dataframe apache-spark pyspark pyspark-sql url-parsing
1个回答
1
投票

对于Spark 2.4+,可以通过split&,然后使用transform函数将每个元素transform转换为key=value

struct(key, value)

编辑

如果要过滤出为空或为空的键,则可以将from pyspark.sql.functions import expr df = spark.createDataFrame([("a=1&b=2&c=3",), ("d&e=&f=4",)], ["cs_uri_query"]) transform_expr = """transform(split(cs_uri_query, '&'), x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value) ) """ df.withColumn("cs_uri_query", expr(transform_expr)).show(truncate=False) #+------------------------+ #|cs_uri_query | #+------------------------+ #|[[a, 1], [b, 2], [c, 3]]| #|[[d,], [e, ], [f, 4]] | #+------------------------+ 与上面的转换表达式一起使用:

filter
© www.soinside.com 2019 - 2024. All rights reserved.