如何使用 PySpark 将另一列中包含的列名称替换为该列的值?

问题描述 投票:0回答:1

我有一个 PySpark 数据框,如下所示:

身份证 col1 col2 colA colB
id_1 %colA t< %colA int1 int3
Id_2 %colB t< %colB int2 int4

我想将以 % 开头的字符串替换为其相应的列值,如下所示。

身份证 col1 col2
id_1 int1 t< int1
Id_2 int4 t< int4

如果我可以循环遍历行,这是可能的。

但是有没有一种有效的方法来做到这一点?

python dataframe apache-spark pyspark apache-spark-sql
1个回答
0
投票

这是一种可能的解决方案,我们可以利用正则表达式的效率,而且当我们使用 Spark 函数时,与编写用户定义的迭代解决方案相比,Sparks 内部优化器也将有助于提高效率。

dfSchema = StructType([
    StructField("id", StringType(), True),
    StructField("col1", StringType(), True),
    StructField("col2", StringType(), True),
    StructField("colA", IntegerType(), True),
    StructField("colB", IntegerType(), True),
])
df = spark.createDataFrame([
    ["id_1", "%colA", "t < %colA", 10, 20],
    ["id_2", "%colB", "t < %colB", 30, 40]
], schema=dfSchema)

df.show()

from pyspark.sql.functions import regexp_extract, col, when, regexp_replace
df.withColumn("col3", regexp_extract(col("col1"), r"(?<=\%).*", 0)) \
    .withColumn("col1", when(col("col3") == "colA", col("colA")).when(col("col3") == "colB", col("colB"))) \
    .withColumn("col4", regexp_extract(col("col2"), r"(?<=\%)(.*)", 1))\
    .withColumn("col5", when(col("col4") == "colA", col("colA")).when(col("col4") == "colB", col("colB")))\
    .withColumn("col2", regexp_replace(col("col2"), r"%(.*)[^\s]", col("col5")))\
.show()
+----+-----+---------+----+----+
|  id| col1|     col2|colA|colB|
+----+-----+---------+----+----+
|id_1|%colA|t < %colA|  10|  20|
|id_2|%colB|t < %colB|  30|  40|
+----+-----+---------+----+----+

+----+----+------+----+----+----+----+----+
|  id|col1|  col2|colA|colB|col3|col4|col5|
+----+----+------+----+----+----+----+----+
|id_1|  10|t < 10|  10|  20|colA|colA|  10|
|id_2|  40|t < 40|  30|  40|colB|colB|  40|
+----+----+------+----+----+----+----+----+

保留临时列以供解释。你可以扔掉它。

© www.soinside.com 2019 - 2024. All rights reserved.