我正在尝试制作一个用于股票预测的时间序列预测项目,该项目还显示特征权重(例如,数据的哪些方面最重要 - 即收盘价、成交量、技术指标、基本面等)和关注度。更具体地说,给定开始日期,模型应使用前 120 天的数据预测未来 30 天的收盘价。此外,它还应该打印出特征权重以及模型在进行预测时对过去 120 天数据的关注程度。
我使用 LIME 和低于标准的方法计算出特征权重,但在股票价格预测的历史数据上仍然可行,误差约为 10-15% rms。我现在不太担心 rmse 误差,因为我预计更好的数据和注意力以及更好的超参数调整将显着改善这一点。但是,我无法弄清楚如何向模型添加注意力,以及如何打印注意力权重。
这是我的训练脚本:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from keras.models import Model
from keras.layers import Input, LSTM, Dense, RepeatVector, TimeDistributed, Attention
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np
import json
import joblib
import sys
import os
from datetime import datetime
# Create sequences for single ticker
def create_sequences(input_data, target_data, window_size, forecast_horizon):
X, y = [], []
for i in range(window_size, len(input_data) - forecast_horizon + 1):
X.append(input_data[i-window_size:i])
y.append(target_data[i:i+forecast_horizon])
return np.array(X), np.array(y)
def train(ticker, forecast_horizon, window_size, epochs=25, batch_size=32, training_split = 0.9):
# Load data for a single ticker
f = open('../../data/processed/data.json')
data_json = json.load(f)
ticker_data = data_json[ticker]['Time Series (Daily)']
f.close()
df = pd.DataFrame(ticker_data).T
df = df.reset_index().rename(columns={'index': 'Date'})
df['Date'] = pd.to_datetime(df['Date'])
df.sort_values(by='Date', inplace=True)
df.reset_index(drop=True, inplace=True)
numeric_features = df.drop(columns=['Date']).columns
# Initialize scalers for the features and target
scaler_features = MinMaxScaler(feature_range=(0, 1))
scaler_target = MinMaxScaler(feature_range=(0, 1))
scaled_features = scaler_features.fit_transform(df[numeric_features])
scaled_target = scaler_target.fit_transform(df[['close']])
X, y = create_sequences(scaled_features, scaled_target, window_size, forecast_horizon)
# Split data into training and test sets
split = int(training_split * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
# Define the seq2seq model architecture
#encoder
encoder_inputs = Input(shape=(window_size, X.shape[2]))
encoder = LSTM(50, return_state=True, dropout=0.2)
encoder_outputs, state_h, state_c = encoder(encoder_inputs)
encoder_states = [state_h, state_c]
#decoder
decoder_inputs = RepeatVector(forecast_horizon)(encoder_outputs)
decoder_lstm = LSTM(50, return_sequences=True, dropout=0.2)
decoder_outputs = decoder_lstm(decoder_inputs, initial_state=encoder_states)
#dense
decoder_dense = TimeDistributed(Dense(1))
decoder_outputs = decoder_dense(decoder_outputs)
model = Model(encoder_inputs, decoder_outputs)
# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')
# Train the model
model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_test, y_test))
# Save the model and scalers
model.save(f'../bin/{ticker}/{forecast_horizon}f_{window_size}w_model.keras')
joblib.dump(scaler_features, f'../bin/{ticker}/{forecast_horizon}f_{window_size}w_scaler_features.pkl')
joblib.dump(scaler_target, f'../bin/{ticker}/{forecast_horizon}f_{window_size}w_scaler_target.pkl')
预测脚本:
import joblib
from keras.models import load_model
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import json
import sys
from datetime import datetime, timedelta
import os
from lime import lime_tabular
import concurrent.futures
# Create sequences for single ticker
def create_sequences(input_data, target_data, window_size, forecast_horizon):
X, y = [], []
for i in range(window_size, len(input_data) - forecast_horizon + 1):
X.append(input_data[i-window_size:i])
y.append(target_data[i:i+forecast_horizon])
return np.array(X), np.array(y)
def load(ticker, window_size, forecast_horizon, root_path):
model_path = os.path.join(root_path, f'models/bin/{ticker}/{forecast_horizon}f_{window_size}w_model.keras')
feat_path = os.path.join(root_path, f'models/bin/{ticker}/{forecast_horizon}f_{window_size}w_scaler_features.pkl')
target_path = os.path.join(root_path, f'models/bin/{ticker}/{forecast_horizon}f_{window_size}w_scaler_target.pkl')
model = load_model(model_path)
scaler_features = joblib.load(feat_path)
scaler_target = joblib.load(target_path)
return (model, scaler_features, scaler_target)
def calculate_rmspe(y_true, y_pred):
# Ensure no zero denominators; add a small value, epsilon, to avoid division by zero errors.
epsilon = 1e-10
# Calculate percentage errors
percentage_errors = ((y_true - y_pred) / (y_true + epsilon)) ** 2
# Calculate mean of percentage errors
mean_percentage_error = np.mean(percentage_errors)
# Return RMSPE
return np.sqrt(mean_percentage_error) * 100
def predict(ticker, window_size, forecast_horizon, start_date, cached_model, root_path):
model, scaler_features, scaler_target = cached_model
# Load data for a single ticker
data_path = os.path.join(root_path, f'data/processed/data.json')
f = open(data_path)
data_json = json.load(f)
ticker_data = data_json[ticker]['Time Series (Daily)'] # Replace TICKER with your actual ticker
f.close()
df = pd.DataFrame(ticker_data).T
df = df.reset_index().rename(columns={'index': 'Date'})
df['Date'] = pd.to_datetime(df['Date'])
df.sort_values(by='Date', inplace=True)
df.reset_index(drop=True, inplace=True)
numeric_features = df.drop(columns=['Date']).columns
scaled_features = scaler_features.fit_transform(df[numeric_features])
scaled_target = scaler_target.fit_transform(df[['close']])
dates_full = pd.to_datetime(df['Date'])
X, y = create_sequences(scaled_features, scaled_target, window_size, forecast_horizon)
# Split data into training and test sets
split = int(0.9 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
if (type(start_date) == str):
dt = pd.to_datetime(start_date)
else:
dt = start_date
while(len(df.index[df['Date'] == dt].tolist()) == 0):
dt = dt + pd.Timedelta(1, unit='d')
start_date= dt
# Find the index of the start date
start_idx = df.index[df['Date'] == pd.to_datetime(start_date)].tolist()[0]
# Prepare the input sequence for the model
input_sequence = scaled_features[start_idx - window_size:start_idx]
input_sequence = input_sequence.reshape((1, window_size, -1))
def lstm_predict_wrapper(model, data_2d):
# The number of samples will be the first dimension of data_2d
num_samples = data_2d.shape[0]
# Reshape the data to 3D format (samples, timesteps, features)
# The last timestep of each sample is used for prediction
data_3d = np.zeros((num_samples, window_size, 26))
data_3d[:, -1, :] = data_2d # Set the last timestep
predictions = model.predict(data_3d)
return predictions[:, -1, 0]
input_sequence_for_lime = input_sequence[0, -1, :].reshape(1, -1)
explainer = lime_tabular.LimeTabularExplainer(
training_data=X_train.reshape(-1, X_train.shape[2]), # Flatten the training data
feature_names=numeric_features,
mode='regression'
)
exp = explainer.explain_instance(
input_sequence_for_lime[0],
lambda x: lstm_predict_wrapper(model, x),
num_features=len(numeric_features)
)
feature_importance = {feature: weight for feature, weight in exp.as_list()}
# Predict the next 30 days
predicted_prices_scaled = model.predict(input_sequence)
# Inverse transform the predicted scaled prices to the original price scale
predicted_prices = scaler_target.inverse_transform(predicted_prices_scaled.reshape(-1, 1)).reshape(-1, forecast_horizon)
predicted_prices=predicted_prices.flatten()
actual_prices_scaled = scaled_target[start_idx:start_idx + forecast_horizon]
actual_prices = scaler_target.inverse_transform(actual_prices_scaled)
actual_prices_full_scaled = scaled_target
actual_prices_full = scaler_target.inverse_transform(actual_prices_full_scaled)
actual_prices=actual_prices.flatten()
rmse = np.sqrt(mean_squared_error(actual_prices, predicted_prices))
rmspe = calculate_rmspe(actual_prices, predicted_prices)
dates_full = pd.to_datetime(df['Date'])
forecast_dates = dates_full[start_idx:start_idx + forecast_horizon]
return {
'ticker': ticker,
'window_size': window_size,
'forecast_horizon': forecast_horizon,
'start_date': start_date,
'predicted_prices' : predicted_prices,
'forecast_dates': forecast_dates,
'dates_full' : dates_full,
'actual_prices': actual_prices,
'actual_prices_full': actual_prices_full,
'rmse': rmse,
'rmspe' : rmspe,
'feature_importance' : feature_importance
}
# Visualize the results
def visualize(predictions):
rmses = [p['rmse'] for p in predictions]
rmspes = [p['rmspe'] for p in predictions]
print(f'Average RMSE:', sum(rmses)/len(rmses))
print(f'Average RMSPE:', sum(rmspes)/len(rmspes))
print('\nAverage Feature Importance:')
data = dict()
for p in predictions:
for f,w in p['feature_importance'].items():
if f not in data.keys():
data[f] = 0
else:
data[f] += float(w)
for k,v in data.items():
data[k] /= len(predictions)
print(data)
p0 = predictions[0]
plt.figure(figsize=(15, 7))
plt.plot(p0['dates_full'],p0['actual_prices_full'], label='Actual Prices', color='blue')
plt.plot(p0['forecast_dates'],p0['predicted_prices'], label='Predicted Prices', color='red')
for p in predictions:
plt.plot(p['forecast_dates'],p['predicted_prices'], color='red')
plt.title(f'Actual vs Predicted Stock Prices for the next {p["forecast_horizon"]} days')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.show()
我在使用 keras
Attention()
层并理解它在该模型结构中的位置时遇到了麻烦,我还想知道如何最终检索注意力权重并将其转发到 predictions
字典中predict()
输出,就像我对特征权重所做的那样。
首先,在 NLP 中,注意力技术在计算 token 到 token 的“注意力”时在理论上是合乎逻辑的,并且它在 seq2seq LSTM 和 Transformer 中都取得了成功。其有效的主要原因主要是自然语言(任何语言)的字母和单词的组合都非常有限。如果您考虑字母表 Σ、所有可能性 Σ* 以及自然语言 Σ',那么应该非常清楚情况是 |Σ'| << |Σ*| (which is infinite in theory though). Anyway, my point is token-to-token either being subword or word level dependencies can be captured easily because there are some repeated patterns and contexts.
在连续域建模(例如价格预测)中,它本质上成为一个更加困难的问题,因为初始空间不是离散的、连续的,并且你基本上没有某种有限词汇,而是有一个域实数或自然数。
我不得不说,我不知道注意力机制是否应用于深度学习模型的这类任务中。然而,我最初的建议是,你首先了解注意力机制及其实际作用,然后你可以添加注意力机制并训练你的模型。在 LSTM 中,有一个突出的注意力机制,它是一种称为 Bahdanau Attention 的附加注意力机制(以论文的第一作者命名)。如果你不想阅读这篇论文,有一篇很好的“快速阅读”,但我强烈建议你阅读。 pytorch中的资源很少