我正在尝试在我的 pyspark 分析中实施预测模型,但出现以下错误。请准确地帮助我,我们可能需要应用更改或解决问题。
错误:
PythonException: 'ValueError: Dataframe has less than 2 non-NaN rows.', 来自 ,第 17 行。下面是完整的回溯: org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 367.0 中的任务 0 失败了 4 次,最近一次失败:任务丢失 0.3 阶段 367.0 (TID 541) (172.26.145.6 executor 0): org.apache.spark.api.python.PythonException: 'ValueError: Dataframe 少于 2 个非 NaN 行。', from , line 17. 下面的完整回溯:回溯(最近一次调用最后一次):文件“”,第 17 行,在 pd_apply_forecast 文件中 “/databricks/python/lib/python3.8/site-packages/prophet/forecaster.py”, 第 1113 行,适合 raise ValueError('Dataframe has less than 2 non-NaN rows.') ValueError: Dataframe has less than 2 non-NaN rows.
我的代码:
import re
import pandas as pd
import pyspark.pandas as ps
from prophet import Prophet
def run_row_outlier_check(df: DataFrame, min_date, start_date, groupby_cols, job_id) -> DataFrame:
"""
| Generate dataframe containing prophet model forecasting of row counts
"""
pd_schema = StructType([
StructField(groupby_col, StringType(), True),
StructField("ds", DateType(), True),
StructField("y", IntegerType(), True),
StructField("yhat", FloatType(), True),
StructField("yhat_lower", FloatType(), True),
StructField("yhat_upper", FloatType(), True),
StructField("trend", FloatType(), True),
StructField("trend_lower", FloatType(), True),
StructField("trend_upper", FloatType(), True),
StructField("additive_terms", FloatType(), True),
StructField("additive_terms_lower", FloatType(), True),
StructField("additive_terms_upper", FloatType(), True),
StructField("weekly", FloatType(), True),
StructField("weekly_lower", FloatType(), True),
StructField("weekly_upper", FloatType(), True),
StructField("yearly", FloatType(), True),
StructField("yearly_lower", FloatType(), True),
StructField("yearly_upper", FloatType(), True),
StructField("multiplicative_terms", FloatType(), True),
StructField("multiplicative_terms_lower", FloatType(), True),
StructField("multiplicative_terms_upper", FloatType(), True)
])
# dataframe of consecutive dates
df_rundates = (ps.DataFrame({'date':pd.date_range(start=min_date, end=(date.today() - timedelta(days=1)))})).to_spark()
# combine + explode to create row for each date and grouped col (e.g. business segment)
df_bizlist = (
df.filter(f"{date_col} >= coalesce(date_sub(date 'today', {num_days_check}), '{start_date}')")
.groupBy(groupby_col)
.count()
.orderBy(col("count").desc())
)
df_rundates_bus = (
df_rundates
.join(df_bizlist, how='full')
.select(df_bizlist[groupby_col], df_rundates["date"].alias("ds"))
)
# create input dataframe for prophet forecast
df_grouped_cnt = df.groupBy(groupby_cols).count()
df_input = (
df_rundates_bus.selectExpr(f"{groupby_col}", "to_date(ds) as ds")
.join(df_grouped_cnt.selectExpr(f"{groupby_col}", f"{date_col} as ds", "count as y"), on=['ds',f'{groupby_col}'], how='left')
.withColumn("y", coalesce("y", lit(0)))
.repartition(sc.defaultParallelism, "ds")
)
# cache dataframe to improve performance
# df_input.cache()
# .repartition(sc.defaultParallelism, "ds")
# forecast
df_forecast = (
df_input
.groupBy(groupby_col)
.applyInPandas(pd_apply_forecast, schema=pd_schema)
)
# filter forecast with outlier scores
df_rowoutliers = (
df_forecast
.filter("y > 0 AND (y > yhat_upper OR y < array_max(array(yhat_lower,0)))")
.withColumn("check_type", lit("row_count"))
.withColumn("deduct_score", expr("round(sqrt(pow(y-yhat, 2) / pow(yhat_lower - yhat_upper,2)))").cast('int'))
.select(
col("check_type"),
col("ds").alias("ref_date"),
col(groupby_col).alias("ref_dimension"),
col("y").cast('int').alias("actual"),
col("deduct_score"),
col("yhat").alias("forecast"),
col("yhat_lower").alias("forecast_lower"),
col("yhat_upper").alias("forecast_upper")
)
)
return add_metadata_columns(df_forecast, job_id), add_metadata_columns(df_rowoutliers, job_id)
def pd_apply_forecast(pd_history: pd.DataFrame) -> pd.DataFrame:
# remove missing values and filter out null business segments
pd_history = (pd_history[pd_history[groupby_col].notnull()]
.dropna())
# instantiate the model, configure the parameters
model = Prophet(
growth='linear',
yearly_seasonality='auto', # default: auto
weekly_seasonality='auto', # default: auto
daily_seasonality=False, # default: auto
seasonality_mode='additive'
)
# fit the model
model.fit(pd_history)
# configure predictions
pd_future = model.make_future_dataframe(
periods=365,
freq='d',
include_history=True
)
# make predictions
pd_forecast = model.predict(pd_future)
# ASSEMBLE EXPECTED RESULT SET
# --------------------------------------
# get relevant fields from forecast
pd_f = pd_forecast[ ['ds', 'yhat', 'yhat_lower', 'yhat_upper', 'trend', 'trend_lower', 'trend_upper', 'additive_terms', 'additive_terms_lower', 'additive_terms_upper', 'weekly', 'weekly_lower', 'weekly_upper', 'yearly', 'yearly_lower', 'yearly_upper', 'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper'] ].set_index('ds')
# get relevant fields from history
pd_h = pd_history[ ['ds', groupby_col, 'y'] ].set_index('ds')
# join history and forecast
pd_results = pd_f.join(pd_h, how='left')
pd_results.reset_index(level=0, inplace=True)
# filter out null dimensions
pd_results = pd_results[pd_results[groupby_col].notnull()]
# return predictions
return pd_results[ [groupby_col, 'ds', 'y', 'yhat', 'yhat_lower', 'yhat_upper', 'trend', 'trend_lower', 'trend_upper', 'additive_terms', 'additive_terms_lower', 'additive_terms_upper', 'weekly', 'weekly_lower', 'weekly_upper', 'yearly', 'yearly_lower', 'yearly_upper', 'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper'] ]
如消息所述 - 您需要在组中至少有两行才能进行预测。删除包含只有一个值的组的行。