我有一个函数,用于为整个数据帧的前馈和实际计算RMSE:
def calculate_rmse(df, actual_column, prediction_column):
RMSE = F.udf(lambda x, y: ((x - y) ** 2))
df = df.withColumn(
"RMSE", RMSE(F.col(actual_column), F.col(prediction_column))
)
rmse = df.select(F.avg("RMSE") ** 0.5).collect()
rmse = rmse[0]["POWER(avg(RMSE), 0.5)"]
return rmse
test = calculate_rmse(my_df, 'actuals', 'preds')
3690.4535
我想将此应用于groupby
语句,但是当我这样做时,会得到以下信息:
df_gb = my_df.groupby('start_month', 'start_week').agg(calculate_rmse(my_df, 'actuals', 'preds'))
all exprs should be Column
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/group.py", line 113, in agg
assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
AssertionError: all exprs should be Column
有人能指出我正确的方向吗?我是Pyspark的新手。
如果要按组计算RMSE,请对我对your question提出的解决方案稍作修改>
import pyspark.sql.functions as psf
def compute_RMSE(expected_col, actual_col):
rmse = old_df.withColumn("squarederror",
psf.pow(psf.col(actual_col) - psf.col(expected_col),
psf.lit(2)
))
.groupby('start_month', 'start_week')
.agg(psf.sum(psf.col("squarederror")).alias("mse"))
.withColumn("rmse", psf.sqrt(psf.col("mse")))
return(rmse)
compute_RMSE("col1", "col2")
我认为您不需要为此使用UDF-我认为您应该能够计算两列之间的差(df.withColumn('difference', col('true') - col('pred'))
),然后计算该列的平方(df.withColumn('squared_difference', pow(col('difference'), lit(2).astype(IntegerType()))
),然后计算列的平均值(df.withColumn('rmse', avg('squared_difference'))
)。结合一个例子: