通过机器学习避免 PySpark 中的 for 循环

问题描述 投票:0回答:1

我在 PySpark 中有一个 for 循环,它可以迭代超市的不同产品,但需要很长时间。我知道 for 循环在 Spark 中效率很低,但我不知道如何在不使用 for 循环的情况下做到这一点。

from sklearn.linear_model import LinearRegression
for i in products:
    # selecting the products
    df = df.filter(F.col("product") == i)
    df_train = df.filter(F.col("pvp_discount").isin([0.2, 0.5, 0.75]))

    # exponential regression
    exp = LinearRegression().fit(X, np.log(Y_delta))
    exp_pred = np.exp(exp.predict(X_test))

    # preparing to add the predictions to my df
    schema_exp_delta = StructType([StructField("delta_exp_pred", DoubleType(), True)])
    exp_delta = spark.createDataFrame(exp_pred.tolist(), schema = schema_exp_delta)
    exp_delta = exp_delta.withColumn("row_idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))

    df = df.join(exp_delta, df.row_idx == exp_delta.row_idx)

任何评论都将受到高度赞赏。

谢谢!

apache-spark pyspark scikit-learn databricks
1个回答
0
投票

您可以使用 Spark 的内置函数按产品对数据进行分组,并将计算并行应用于每个组,而无需使用 for 循环。实现此目的的一种方法是使用

groupBy
mapInPandas
函数。

from sklearn.linear_model import LinearRegression
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Define a UDF
@pandas_udf(schema_exp_delta, PandasUDFType.GROUPED_MAP)
def exponential_regression(pdf):
   X, Y = pdf[['x_col']], pdf[['y_col']]
   exp = LinearRegression().fit(X, np.log(Y))
   exp_pred = np.exp(exp.predict(X_test))
   schema_exp_delta = StructType([StructField("delta_exp_pred", DoubleType(), True)])
   exp_delta = spark.createDataFrame(exp_pred.tolist(), schema = schema_exp_delta)
   exp_delta = exp_delta.withColumn("row_idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
   return exp_delta

# Group the data and apply regression to each group
df = df.filter(F.col("pvp_discount").isin([0.2, 0.5, 0.75]))
df = df.groupby('product').apply(exponential_regression)
© www.soinside.com 2019 - 2024. All rights reserved.