我正在尝试创建一个预测出租车需求的模型。我希望我的模型能够从数据集中前 7 周的数据中学习,然后针对第 8 周进行测试。第8周应该是看不见的。然而我的图表表明不然,我的预测需求与实际需求几乎相同。我哪里做错了?
def build_model(input_shape):
model = Sequential([
GRU(80, return_sequences=True, input_shape=input_shape),
Dropout(0.1),
GRU(80),
Dropout(0.1),
Dense(1) # Predicting a single future time step
])
model.compile(optimizer='adam', loss='mse')
return model
def prepare_data(file_path, location_id, input_day_of_week, window_size=12):
df = pd.read_csv(file_path, index_col='tpep_pickup_datetime')
df.index = pd.to_datetime(df.index)
if location_id in df.columns:
df = df[[location_id]]
df['day_of_week'] = df.index.dayofweek
df = df[df['day_of_week'] == input_day_of_week]
scaler = MinMaxScaler(feature_range=(0, 1))
df_scaled = scaler.fit_transform(df[[location_id]])
X, y = [], []
for i in range(len(df_scaled) - window_size ):
X.append(df_scaled[i:(i + window_size)])
y.append(df_scaled[i + window_size ])
X, y = np.array(X), np.array(y)
cutoff_date = df.index.max() - pd.Timedelta(weeks=1)
training_set = df[df.index <= cutoff_date]
testing_set = df[df.index > cutoff_date]
train_size = len(training_set) - window_size
X_train, y_train = X[:train_size], y[:train_size]
X_test, y_test = X[train_size:], y[train_size:]
return X_train, y_train, X_test, y_test, scaler
else:
print(f"Location ID {location_id} not found in the dataset.")
return None, None, None, None, None
def main():
data_path = 'combined.csv'
location_id = input("Enter Location ID: ")
day_of_week = int(input("Enter Day of the Week (0=Monday, ..., 6=Sunday): "))
start_time = time.time()
X_train, y_train, X_test, y_test, scaler = prepare_data(data_path, location_id, day_of_week)
if X_train is not None and X_train.shape[0] > 0:
input_shape = (X_train.shape[1], X_train.shape[2])
model = build_model(input_shape)
model.fit(X_train, y_train, epochs=50, verbose=1)
test_loss = model.evaluate(X_test, y_test)
print(f"Test Loss: {test_loss}")
predicted_demand_test = model.predict(X_test)
predicted_demand_test = scaler.inverse_transform(predicted_demand_test)
actual_demand_test = scaler.inverse_transform(y_test)
rmse = np.sqrt(mean_squared_error(actual_demand_test, predicted_demand_test))
print(f"Root Mean Squared Error: {rmse}")
end_time = time.time()
print(f"Execution time: {end_time - start_time:.2f} seconds")
num_time_steps = len(actual_demand_test)
time_labels = [f"{str(i//2).zfill(2)}:{'00' if i%2 == 0 else '30'}" for i in range(num_time_steps)]
plt.figure(figsize=(15, 7))
plt.plot(time_labels, actual_demand_test, label='Actual Demand (Test Set)')
plt.plot(time_labels, predicted_demand_test, label='Predicted Demand (Test Set)', alpha=0.7)
plt.title(f'Actual vs Predicted Demand on Test Set - RMSE: {rmse:.2f}')
plt.xlabel('Time of Day')
plt.xticks(rotation=45)
plt.ylabel('Demand')
plt.legend()
plt.tight_layout()
plt.show()
else:
print("Insufficient data to train the model.")
if __name__ == "__main__":
main()
我已经尝试过下面的索引,但无济于事。
for i in range(len(df_scaled) - window_size ):
X.append(df_scaled[i:(i + window_size)])
y.append(df_scaled[i + window_size ])
这是因为您每次仅预测一个时间步长,而无需以任何方式更改数据集。这不是时间序列模型的构建方式。
您应该做的是预测未来的 1 步,然后将该结果添加到训练数据集中并预测第二天,依此类推,直到您完成数据。
当您一次仅预测一步时,模型可以很好地拟合,但是当预测的输出成为数据的一部分时,任何差异都会迅速扩大到很大程度,从而导致测试期结束可能完全出局了。
您通常应该看到(如果模型不错)测试和训练结果在开始时很接近,然后在一段时间后出现分歧。