如何在pyspark的多列中循环多个衰减率?

问题描述 投票:0回答:1

我试图在我的函数的参数中传递一个列表。

我的列表是由不同的系数组成的,要应用于滞后的许多列。

然而,我只在我的数据框架中生成了列表中第一个值的列。

这是我的实际结果。

"col1","col2","col1_0.2","col2_0.2"

什么是预期:

"col1","col2","col1_0.2","col2_0.2","col1_0.4","col2_0.4","col1_0.6","col2_0.6"

我一定是在我的循环中漏掉了一些东西?

selected_col = col_selector(df, ["col1", "col2"])


w = Window.partitionBy("student").orderBy("date")
coef = (.1,.4,.6)

def custom_coef(col, w, coef):
    for x in coef:
        return sum(
            pow(i, x) * F.lag(F.col(col), i, default=0).over(w)
            for i in range(1)
        ).alias(col +"_"+str(x))

new_df = df.select(
    F.col("*"),
    *[custom_coef(col, w, coef) for col in selected_col]
)

谢谢

loops apache-spark pyspark lag pow
1个回答
1
投票

列表中的 return 声明中 custom_coef 函数在第一次执行完循环后结束函数。coef. 这意味着 custom_coef 将总是返回第一个列的定义,这就是coef 0.1的列定义。由于函数在 selected_col 你会得到你所描述的结果。

解决这个问题的一个方法是在不改变代码结构的情况下,将以下代码替换为 returnyield. 这样一来 custom_coef 的每个元素创建一个生成器。selected_col. 这些发电机可以与 itertools.chain 而这个结果可以被用作 select 声明:

def custom_coef(col, w, coef):
    for x in coef:
        yield sum(  #use yield instead of return
            pow(i, x) * F.lag(F.col(col), i, default=0).over(w)
            for i in range(1)
        ).alias(col +"_"+str(x))

new_df = df.select(
    F.col("*"),
    *chain(*[custom_coef(col, w, coef) for col in selected_col]) #chain the generators
)
new_df.show()
© www.soinside.com 2019 - 2024. All rights reserved.