迭代列表以更新 PySpark df 列

问题描述 投票:0回答:1

需要根据条件通过迭代列表向 PySpark DF 添加新列。

new_line_id = 数组('a', 'b', 'c')

输入DF(LineID不是这个DF中的PK): |线路 ID | | --------| |米|
|空 | | T | |空 | |空 | | P|

列表包含的项目数量与线路 ID 中的 Null 数量完全相同。基本上只要 LineID 为空,就从列表中选择一个值。 所需输出: |线路 ID |新专栏| | --------| ----------| |中号 |中号 | |空 |一个 | | T | T | |空 | b |
|空 | c | |普 | P|

以下代码在每一行中添加整个列表。 不起作用的代码:

new_df = df.withColumn("new_col", when(df.line_id.isNull(), array([lit(x) for x in new_line_id]).cast(StringType()).otherwise(df.line_id)
python list pyspark apache-spark-sql iteration
1个回答
0
投票

你可以试试这个:

from pyspark.sql import Window
from pyspark.sql.functions import row_number, when, col, udf
from pyspark.sql.types import StringType

df.show()

windowSpec = Window.orderBy(col("LineID")).rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_with_row_num = df.withColumn("row_num", when(col("LineID").isNull(), row_number().over(windowSpec)).otherwise(None))

def assign_value(line_id, row_num):
    if line_id is None:
        return new_line_id[row_num - 1] # subtract 1 because Python list indexing starts at 0
    else:
        return line_id

assign_value_udf = udf(assign_value, StringType())

new_df = df_with_row_num.withColumn("new_col", assign_value_udf(col("LineID"), col("row_num")))

new_df.show()

© www.soinside.com 2019 - 2024. All rights reserved.