我有一个以下格式的 PySpark 数据框(records_002_df),并且将来会添加更多列 -
+-----------+-----------------+-------------+
|RECORD_TYPE| CLAIM_NUMBER|RECEIVED_DATE|
+-----------+-----------------+-------------+
| 002| 23E002113200| 08/30/2023|
| 002| 23P001125500| 05/30/2023|
| 002| 23E002114300| 01/30/2024|
| 002|20223124002830199| 12/31/2022|
| 002|20223124003270199| 12/31/2022|
| 002|20223493004410199| 12/31/2022|
我需要对每一列执行单独的转换,对于其中一个我尝试如下 -
trans_df=records_002_df.withColumn('RECORD_TYPE',when(records_002_df['RECORD_TYPE'] == '002','In-Network').otherwise('Out-Of-Network'))
上面给了我预期的数据帧,并在 RECORD_TYPE 字段上进行了所需的转换。 但我有不同的转换应用于其他列。还希望将转换逻辑保留在单独的模块中,以便 Spark 脚本应该是通用的。 请提出一些想法,我如何才能实现这一目标。 谢谢!!
将转换放入字典中
在列表达式中,不应直接引用数据框,而应通过其列名称引用输入数据。
from pyspark.sql import functions as F
transformations = {"RECORD_TYPE": F.when(F.col("RECORD_TYPE") == '002','In-Network')
.otherwise('Out-Of-Network'),
"NEXT_DAY": F.date_add(F.to_date("RECEIVED_DATE", "MM/dd/yyyy"), 1)
}
这个字典现在可以应用于数据框:
records_002_df.select([F.col(name) for name in df1.columns - transformations.keys()] # unchanged columns
+ [ transformation.alias(name) for (name,transformation) # transformed columns
in transformations.items()]) \
.show()
结果:
+-------------+-------------+-----------+----------+
|RECEIVED_DATE| CLAIM_NUMBER|RECORD_TYPE| NEXT_DAY|
+-------------+-------------+-----------+----------+
| 08/30/2023| 23E002113200| In-Network|2023-08-31|
| 05/30/2023|J23P001125500| In-Network|2023-05-31|
| 01/30/2024| 23E002114300| In-Network|2024-01-31|
+-------------+-------------+-----------+----------+
transformations
字典没有对原始数据框的任何引用,因此它的创建可以移动到另一个模块。