Pyspark:通过搜索字典替换列中的值

问题描述 投票:0回答:7

我是 PySpark 的新手。

我有一个 Spark

DataFrame
df
,其中有一列“device_type”。

我想将“Tablet”或“Phone”中的每个值替换为“Phone”,并将“PC”替换为“Desktop”。

在Python中我可以执行以下操作,

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df['device_type'] = df['device_type'].replace(deviceDict,inplace=False)

如何使用 PySpark 实现这一目标?谢谢!

python apache-spark dataframe pyspark apache-spark-sql
7个回答
35
投票

您可以使用

na.replace
:

df = spark.createDataFrame([
    ('Tablet', ), ('Phone', ),  ('PC', ), ('Other', ), (None, )
], ["device_type"])

df.na.replace(deviceDict, 1).show()
+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|      Other|
|       null|
+-----------+

或地图文字:

from itertools import chain
from pyspark.sql.functions import create_map, lit

mapping = create_map([lit(x) for x in chain(*deviceDict.items())])


df.select(mapping[df['device_type']].alias('device_type'))
+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|       null|
|       null|
+-----------+

请注意,后一种解决方案会将映射中不存在的值转换为

NULL
。如果这不是您想要的行为,您可以添加
coalesce
:

from pyspark.sql.functions import coalesce


df.select(
    coalesce(mapping[df['device_type']], df['device_type']).alias('device_type')
)
+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|      Other|
|       null|
+-----------+

21
投票

经过大量搜索和替代方案,我认为使用 python dict 替换的最简单方法是使用 pyspark dataframe 方法

replace
:

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df_replace = df.replace(deviceDict,subset=['device_type'])

这会将所有值替换为 dict,如果您传递 dict 参数与子集参数相结合,则可以使用

df.na.replace()
获得相同的结果。在他的docs上还不够清楚,因为如果你搜索函数
replace
,你会得到两个引用,一个在
pyspark.sql.DataFrame.replace
内部,另一个在
pyspark.sql.DataFrameNaFunctions.replace
侧面,但是两个引用的示例代码都使用
 df.na.replace
所以不清楚你实际上可以使用
df.replace


9
投票

这是一个小辅助函数,受到 R

recode
函数的启发,它抽象了之前的答案。作为奖励,它添加了默认值的选项。

from itertools import chain
from pyspark.sql.functions import col, create_map, lit, when, isnull
from pyspark.sql.column import Column

df = spark.createDataFrame([
    ('Tablet', ), ('Phone', ),  ('PC', ), ('Other', ), (None, )
], ["device_type"])

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}

df.show()
+-----------+
|device_type|
+-----------+
|     Tablet|
|      Phone|
|         PC|
|      Other|
|       null|
+-----------+

这是

recode
的定义。

def recode(col_name, map_dict, default=None):
    if not isinstance(col_name, Column): # Allows either column name string or column instance to be passed
        col_name = col(col_name)
    mapping_expr = create_map([lit(x) for x in chain(*map_dict.items())])
    if default is None:
        return  mapping_expr.getItem(col_name)
    else:
        return when(~isnull(mapping_expr.getItem(col_name)), mapping_expr.getItem(col_name)).otherwise(default)

创建没有默认值的列会在所有不匹配的值中给出

null
/
None

df.withColumn("device_type", recode('device_type', deviceDict)).show()

+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|       null|
|       null|
+-----------+

另一方面,指定

default
的值会将所有不匹配的值替换为此默认值。

df.withColumn("device_type", recode('device_type', deviceDict, default='Other')).show()

+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|      Other|
|      Other|
+-----------+

8
投票

您也可以使用

df.withColumn
来完成此操作:

from itertools import chain
from pyspark.sql.functions import create_map, lit

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}

mapping_expr = create_map([lit(x) for x in chain(*deviceDict.items())])

df = df.withColumn('device_type', mapping_expr[df['dvice_type']])
df.show()

6
投票

最简单的方法是在数据框上应用

udf
:

    from pyspark.sql.functions import col , udf

    deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
    map_func = udf(lambda row : deviceDict.get(row,row))
    df = df.withColumn("device_type", map_func(col("device_type")))

2
投票

解决此问题的另一种方法是在传统 sql 中使用

CASE WHEN
,但使用
f-strings
并使用 python 字典和
.join
来自动生成
CASE WHEN
语句:

column = 'device_type' #column to replace

e = f"""CASE {' '.join([f"WHEN {column}='{k}' THEN '{v}'" 
             for k,v in deviceDict.items()])} ELSE {column} END"""

df.withColumn(column,F.expr(e)).show()

+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|      Other|
|       null|
+-----------+

注意: 如果要在键不匹配的情况下返回

NULL
,只需在变量
ELSE {column} END
的case语句中将
ELSE NULL END
更改为
e

即可
column = 'device_type' #column to replace

e = f"""CASE {' '.join([f"WHEN {column}='{k}' THEN '{v}'" 
             for k,v in deviceDict.items()])} ELSE NULL END"""

df.withColumn('New_Col',F.expr(e)).show()

+-----------+-------+
|device_type|New_Col|
+-----------+-------+
|     Tablet| Mobile|
|      Phone| Mobile|
|         PC|Desktop|
|      Other|   null|
|       null|   null|
+-----------+-------+

0
投票

我发现的最好的方法是:

df.replace(list(deviceDict.keys()), list(deviceDict.values()), 'device_type')

© www.soinside.com 2019 - 2024. All rights reserved.