我是 PySpark 的新手。
我有一个 Spark
DataFrame
df
,其中有一列“device_type”。
我想将“Tablet”或“Phone”中的每个值替换为“Phone”,并将“PC”替换为“Desktop”。
在Python中我可以执行以下操作,
deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df['device_type'] = df['device_type'].replace(deviceDict,inplace=False)
如何使用 PySpark 实现这一目标?谢谢!
您可以使用
na.replace
:
df = spark.createDataFrame([
('Tablet', ), ('Phone', ), ('PC', ), ('Other', ), (None, )
], ["device_type"])
df.na.replace(deviceDict, 1).show()
+-----------+
|device_type|
+-----------+
| Mobile|
| Mobile|
| Desktop|
| Other|
| null|
+-----------+
或地图文字:
from itertools import chain
from pyspark.sql.functions import create_map, lit
mapping = create_map([lit(x) for x in chain(*deviceDict.items())])
df.select(mapping[df['device_type']].alias('device_type'))
+-----------+
|device_type|
+-----------+
| Mobile|
| Mobile|
| Desktop|
| null|
| null|
+-----------+
请注意,后一种解决方案会将映射中不存在的值转换为
NULL
。如果这不是您想要的行为,您可以添加 coalesce
:
from pyspark.sql.functions import coalesce
df.select(
coalesce(mapping[df['device_type']], df['device_type']).alias('device_type')
)
+-----------+
|device_type|
+-----------+
| Mobile|
| Mobile|
| Desktop|
| Other|
| null|
+-----------+
经过大量搜索和替代方案,我认为使用 python dict 替换的最简单方法是使用 pyspark dataframe 方法
replace
:
deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df_replace = df.replace(deviceDict,subset=['device_type'])
这会将所有值替换为 dict,如果您传递 dict 参数与子集参数相结合,则可以使用
df.na.replace()
获得相同的结果。在他的docs上还不够清楚,因为如果你搜索函数replace
,你会得到两个引用,一个在pyspark.sql.DataFrame.replace
内部,另一个在pyspark.sql.DataFrameNaFunctions.replace
侧面,但是两个引用的示例代码都使用 df.na.replace
所以不清楚你实际上可以使用df.replace
。
这是一个小辅助函数,受到 R
recode
函数的启发,它抽象了之前的答案。作为奖励,它添加了默认值的选项。
from itertools import chain
from pyspark.sql.functions import col, create_map, lit, when, isnull
from pyspark.sql.column import Column
df = spark.createDataFrame([
('Tablet', ), ('Phone', ), ('PC', ), ('Other', ), (None, )
], ["device_type"])
deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df.show()
+-----------+
|device_type|
+-----------+
| Tablet|
| Phone|
| PC|
| Other|
| null|
+-----------+
这是
recode
的定义。
def recode(col_name, map_dict, default=None):
if not isinstance(col_name, Column): # Allows either column name string or column instance to be passed
col_name = col(col_name)
mapping_expr = create_map([lit(x) for x in chain(*map_dict.items())])
if default is None:
return mapping_expr.getItem(col_name)
else:
return when(~isnull(mapping_expr.getItem(col_name)), mapping_expr.getItem(col_name)).otherwise(default)
创建没有默认值的列会在所有不匹配的值中给出
null
/None
。
df.withColumn("device_type", recode('device_type', deviceDict)).show()
+-----------+
|device_type|
+-----------+
| Mobile|
| Mobile|
| Desktop|
| null|
| null|
+-----------+
另一方面,指定
default
的值会将所有不匹配的值替换为此默认值。
df.withColumn("device_type", recode('device_type', deviceDict, default='Other')).show()
+-----------+
|device_type|
+-----------+
| Mobile|
| Mobile|
| Desktop|
| Other|
| Other|
+-----------+
您也可以使用
df.withColumn
来完成此操作:
from itertools import chain
from pyspark.sql.functions import create_map, lit
deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
mapping_expr = create_map([lit(x) for x in chain(*deviceDict.items())])
df = df.withColumn('device_type', mapping_expr[df['dvice_type']])
df.show()
最简单的方法是在数据框上应用
udf
:
from pyspark.sql.functions import col , udf
deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
map_func = udf(lambda row : deviceDict.get(row,row))
df = df.withColumn("device_type", map_func(col("device_type")))
解决此问题的另一种方法是在传统 sql 中使用
CASE WHEN
,但使用 f-strings
并使用 python 字典和 .join
来自动生成 CASE WHEN
语句:
column = 'device_type' #column to replace
e = f"""CASE {' '.join([f"WHEN {column}='{k}' THEN '{v}'"
for k,v in deviceDict.items()])} ELSE {column} END"""
df.withColumn(column,F.expr(e)).show()
+-----------+
|device_type|
+-----------+
| Mobile|
| Mobile|
| Desktop|
| Other|
| null|
+-----------+
注意: 如果要在键不匹配的情况下返回
NULL
,只需在变量ELSE {column} END
的case语句中将ELSE NULL END
更改为e
即可
column = 'device_type' #column to replace
e = f"""CASE {' '.join([f"WHEN {column}='{k}' THEN '{v}'"
for k,v in deviceDict.items()])} ELSE NULL END"""
df.withColumn('New_Col',F.expr(e)).show()
+-----------+-------+
|device_type|New_Col|
+-----------+-------+
| Tablet| Mobile|
| Phone| Mobile|
| PC|Desktop|
| Other| null|
| null| null|
+-----------+-------+
我发现的最好的方法是:
df.replace(list(deviceDict.keys()), list(deviceDict.values()), 'device_type')