我有一个 PySpark 数据框,如下所示:
身份证 | col1 | col2 | colA | colB |
---|---|---|---|---|
id_1 | %colA | t< %colA | int1 | int3 |
Id_2 | %colB | t< %colB | int2 | int4 |
我想将以 % 开头的字符串替换为其相应的列值,如下所示。
身份证 | col1 | col2 |
---|---|---|
id_1 | int1 | t< int1 |
Id_2 | int4 | t< int4 |
如果我可以循环遍历行,这是可能的。
但是有没有一种有效的方法来做到这一点?
这是一种可能的解决方案,我们可以利用正则表达式的效率,而且当我们使用 Spark 函数时,与编写用户定义的迭代解决方案相比,Sparks 内部优化器也将有助于提高效率。
dfSchema = StructType([
StructField("id", StringType(), True),
StructField("col1", StringType(), True),
StructField("col2", StringType(), True),
StructField("colA", IntegerType(), True),
StructField("colB", IntegerType(), True),
])
df = spark.createDataFrame([
["id_1", "%colA", "t < %colA", 10, 20],
["id_2", "%colB", "t < %colB", 30, 40]
], schema=dfSchema)
df.show()
from pyspark.sql.functions import regexp_extract, col, when, regexp_replace
df.withColumn("col3", regexp_extract(col("col1"), r"(?<=\%).*", 0)) \
.withColumn("col1", when(col("col3") == "colA", col("colA")).when(col("col3") == "colB", col("colB"))) \
.withColumn("col4", regexp_extract(col("col2"), r"(?<=\%)(.*)", 1))\
.withColumn("col5", when(col("col4") == "colA", col("colA")).when(col("col4") == "colB", col("colB")))\
.withColumn("col2", regexp_replace(col("col2"), r"%(.*)[^\s]", col("col5")))\
.show()
+----+-----+---------+----+----+
| id| col1| col2|colA|colB|
+----+-----+---------+----+----+
|id_1|%colA|t < %colA| 10| 20|
|id_2|%colB|t < %colB| 30| 40|
+----+-----+---------+----+----+
+----+----+------+----+----+----+----+----+
| id|col1| col2|colA|colB|col3|col4|col5|
+----+----+------+----+----+----+----+----+
|id_1| 10|t < 10| 10| 20|colA|colA| 10|
|id_2| 40|t < 40| 30| 40|colB|colB| 40|
+----+----+------+----+----+----+----+----+
保留临时列以供解释。你可以扔掉它。