将 Pandas 函数转换为 Pyspark 函数

问题描述 投票:0回答:1

我需要将以下代码转换为 Pyspark。

我知道如何在 Pyspark 中创建数据框

df_stack_exchange
,但不知道如何在 Pyspark 中创建等效的
assign_boxes
函数。任何帮助将不胜感激

data_stack_exchange = {'store': ['A','B', 'B', 'C', 'C', 'C', 'D', 'D', 'D', 'D'],
        'worker': [1,1,2,1,2,3,1,2,3,4],
        'boxes': [105, 90, 100, 80, 10, 200, 70, 210, 50, 0],
        'optimal_boxes': [0,0,0,0,0,0,0,0,0,0]}
df_stack_exchange = pandas.DataFrame(data_stack_exchange)

def assign_boxes(s):
    total = s.sum()
    d = min(total // 100, len(s)-1)
    return [100]*d+[total - 100*d]+[0]*(len(s)-d-1)

df['optimal_boxes'] = df.groupby('store')['boxes'].transform(assign_boxes)

我已经通过官方 Pyspark 文档和 50 多个 stackexchange 线程阅读了 UDF,但无法弄清楚

pandas dataframe apache-spark pyspark user-defined-functions
1个回答
0
投票

下面的方法有效,但我没有使用函数:

w0=Window.partitionBy("store")
sp_df=sp_df.withColumn("row",row_number().over(w0.orderBy(monotonically_increasing_id())))
sp_df=sp_df.withColumn("optimal_boxes",lit("100"))
sp_df=sp_df.withColumn("sum_boxes",sum(col("boxes")).over(w0))

sp_df=sp_df.withColumn("optimal_boxes",when(col("row")==max(col("row")).over(w0),\
                                           col("sum_boxes")-lag(sum(col("optimal_boxes")).over(w0.orderBy("row"))).over(w0.orderBy("row"))).otherwise(col("optimal_boxes")))
sp_df=sp_df.withColumn("optimal_boxes",when(col("optimal_boxes").isNull(),col("boxes")).otherwise(col("optimal_boxes")))
sp_df=sp_df.drop("sum_boxes","row")
sp_df.show()
© www.soinside.com 2019 - 2024. All rights reserved.