我在 PySpark 中有一个 for 循环,它可以迭代超市的不同产品,但需要很长时间。我知道 for 循环在 Spark 中效率很低,但我不知道如何在不使用 for 循环的情况下做到这一点。
from sklearn.linear_model import LinearRegression
for i in products:
# selecting the products
df = df.filter(F.col("product") == i)
df_train = df.filter(F.col("pvp_discount").isin([0.2, 0.5, 0.75]))
# exponential regression
exp = LinearRegression().fit(X, np.log(Y_delta))
exp_pred = np.exp(exp.predict(X_test))
# preparing to add the predictions to my df
schema_exp_delta = StructType([StructField("delta_exp_pred", DoubleType(), True)])
exp_delta = spark.createDataFrame(exp_pred.tolist(), schema = schema_exp_delta)
exp_delta = exp_delta.withColumn("row_idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
df = df.join(exp_delta, df.row_idx == exp_delta.row_idx)
任何评论都将受到高度赞赏。
谢谢!
您可以使用 Spark 的内置函数按产品对数据进行分组,并将计算并行应用于每个组,而无需使用 for 循环。实现此目的的一种方法是使用
groupBy
和 mapInPandas
函数。
from sklearn.linear_model import LinearRegression
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
# Define a UDF
@pandas_udf(schema_exp_delta, PandasUDFType.GROUPED_MAP)
def exponential_regression(pdf):
X, Y = pdf[['x_col']], pdf[['y_col']]
exp = LinearRegression().fit(X, np.log(Y))
exp_pred = np.exp(exp.predict(X_test))
schema_exp_delta = StructType([StructField("delta_exp_pred", DoubleType(), True)])
exp_delta = spark.createDataFrame(exp_pred.tolist(), schema = schema_exp_delta)
exp_delta = exp_delta.withColumn("row_idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
return exp_delta
# Group the data and apply regression to each group
df = df.filter(F.col("pvp_discount").isin([0.2, 0.5, 0.75]))
df = df.groupby('product').apply(exponential_regression)