我正在尝试训练贝叶斯神经网络来进行噪声时间序列预测。我有以下问题
import numpy as np
import matplotlib.pyplot as plt
lookBack = 128
inputDim = 1
lookAHead = 32
split = 0.15
# create data
data_length = 10000
y_data = []
for ii in range(data_length):
y_data.append(0.5*ii + 2 + np.random.normal(0, ii*ii/100000))
x_data = np.arange(data_length)
#normalize data
y_data = np.array(y_data)
y_data = (y_data - np.mean(y_data)) / np.std(y_data)
plt.plot(x_data, y_data)
plt.show()
#construct input/target data
train_data_input = []
train_data_target = []
for ii in range(shape[0] - lookBack - lookAHead):
train_data_input.append(np.array(y_data[ii:ii+lookBack]))
train_data_target.append(y_data[ii+lookBack + lookAHead])
train_data_input = np.array(train_data_input)
train_data_input = np.expand_dims(train_data_input, axis = -1)
train_data_target = np.array(train_data_target)
我的模特:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_probability as tfp
from tensorflow.keras.layers import Dense, Input
inputModel = Input(shape=(lookBack, inputDim))
features = keras.layers.Flatten()(inputModel)
features = layers.Dense(32, activation = "tanh")(features)
features = tfp.layers.DenseVariational(
units=8,
make_prior_fn=prior,
make_posterior_fn=posterior,
kl_weight=1/(train_data_input.shape[0]*(1-split)),
activation="tanh",
)(features)
我尝试了不同的输出类型,只是回归值或学习到的分布:
if output_type == "val":
outputs = layers.Dense(1)(features)
if output_type == "dist":
distribution_params = Dense(2) (features)
#c = np.log(np.expm1(1.0))
#outputs = tfp.layers.DistributionLambda(lambda t: tfp.distributions.Normal(loc=t[..., :1],
# scale=1e-3 + tf.math.softplus(c+0.05 * t[..., 1:])))(distribution_params)
outputs = tfp.layers.IndependentNormal(1)(distribution_params)
model = keras.Model(inputs=inputModel, outputs=outputs)
def negative_loglikelihood(targets, estimated_distribution):
return -estimated_distribution.log_prob(targets)
loss = None
if output_type == "dist"
loss = negative_loglikelihood
#def loss(y_true, y_pred):
# return tf.keras.losses.KLD(y_true, y_pred)
if output_type == "val":
loss = tf.keras.losses.MeanSquaredError()
model.compile(
optimizer=optimizer,
loss=loss
)
model.fit(x = train_data_input, y = train_data_target, epochs=25, validation_split=split)
我尝试了不同的单位大小、损失函数(-log_pob、KLD)激活函数、(可学习的)先验(1/2,见下文)和后验(1/2,见下文),但我的模型并没有真正学习有用东西。此外,我预计输出的 stdDev 会随着 x 的增加而增加,因为噪声也会增加,但事实并非如此。
def prior1(kernel_size, bias_size, dtype=None):
n = kernel_size + bias_size
prior_model = keras.Sequential(
[
tfp.layers.DistributionLambda(
lambda t: tfp.distributions.MultivariateNormalDiag(
loc=tf.zeros(n), scale_diag=tf.ones(n).
)
)
]
)
return prior_model
def prior2(kernel_size, bias_size, dtype=None):
n = kernel_size + bias_size
c = np.log(np.expm1(1.0))
return tf.keras.Sequential([
tfp.layers.VariableLayer(2*n, dtype=dtype),
tfp.layers.DistributionLambda(lambda t: tfp.distributions.Independent(
tfp.distributions.Normal(loc=t[...,:n], scale=1e-3 + tf.math.softplus(c + 0.05 * t[..., n:])), reinterpreted_batch_ndims=1))
])
def posterior1(kernel_size, bias_size, dtype=None):
n = kernel_size + bias_size
posterior_model = keras.Sequential(
[
tfp.layers.VariableLayer(
tfp.layers.MultivariateNormalTriL.params_size(n), dtype=dtype
),
tfp.layers.MultivariateNormalTriL(n),
]
)
return posterior_model
def posterior2(kernel_size, bias_size=0, dtype=None):
n = kernel_size + bias_size
c = np.log(np.expm1(1.0))
return keras.Sequential([
tfp.layers.VariableLayer(2 * n, dtype=dtype),
tfp.layers.DistributionLambda(lambda t: tfp.distributions.Independent(
tfp.distributions.Normal(loc=t[..., :n],
scale=1e-5 + tf.nn.softplus(c + t[..., n:])),
)),
])
模型评估:
predictions = []
means = []
stds = []
for ii in range(20):
prediction = model(train_data_input)
if output_type == "val":
predictions.append(prediction)
if output_type == "dist":
means.append(prediction.mean())
stds.append(prediction.stddev())
if output_type == "val":
predictions = np.array(predictions)
predicted_mean = np.mean(predictions, axis = 0)
predicted_std = np.std(predictions, axis = 0)
if output_type == "dist":
predicted_mean = np.mean(np.array(means), axis = 0)
predicted_std = np.mean(np.array(stds), axis = 0)
lower_bound = predicted_mean - predicted_std
upper_bound = predicted_mean + predicted_std
plt.plot(x_data, y_data)
#Plot the mean
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(predicted_mean), color = "r")
#Plot the begging of the test set
plt.vlines(x=float(1-split)*predicted_mean.shape[0] + lookBack + lookAHead,ymin = np.min(predicted_mean), ymax = np.max(predicted_mean), colors = "g")
#Plot the hopefully increasing stdDev
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, predicted_std, color = "g")
#Plot mean +/- stdDev
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(lower_bound), color = "r", alpha = 0.5)
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(upper_bound), color = "r", alpha = 0.5)
我感谢任何有关如何让模型学习的建议。
在下面的代码中,根据每个时间点(
y_rv
和 y_mean
)的已知平均值和标准差生成随机输入信号 (y_std
)。该模型从输入信号中获取一个窗口,并学习预测用于对该信号进行采样的平均值和标准差(即输入为 y_rv
,目标为 y_mean
和 y_std
)。
我首先将模型简化为预测平均值的单输出回归模型。 ReLU 激活有助于稳定训练。一旦基本模型开始工作,我就添加了标准差的第二个输出。
我修改了模型和目标,以便模型预测未来最多 32 个点的值序列,而不是简单地预测 t=32 时的单个向量。我认为这为模型在预测标准差时提供了更多背景信息,而不必依赖单个数据点。添加 dropout 还有助于模型稳定标准差的预测。直接估计标准差比估计效果更好
log(std)
。
显示训练集中的单个窗口:
目标和预测:
import numpy as np
import matplotlib.pyplot as plt
import random
random.seed(0)
np.random.seed(0)
lookBack = 128
lookAHead = 32
split = 0.15
# create data
data_length = 10_000
y_mean = np.empty(data_length)
y_std = np.empty_like(y_mean)
y_rv = np.empty_like(y_mean)
for ii in range(data_length):
mean = 0.5*ii + 2
std = ii**2 / 100_000
y_mean[ii] = mean
y_std[ii] = std
#Sample an rv from the Normal(mean, std) distribution
y_rv[ii] = np.random.normal(loc=mean, scale=std)
#construct input/target data
train_input = []
train_target_mean = []
train_target_std = []
train_target_rv = []
for ii in range(data_length - lookBack - lookAHead):
#For ii, train will use a lookBack-sized window starting at ii
train_input.append(np.array(y_rv[ii:ii + lookBack]))
#Use it to predict the value(s) that's lookAHead beyond the window end
target_start = ii + lookBack
target_end = target_start + lookAHead
train_target_mean.append( np.array(y_mean[target_start:target_end]) )
train_target_std.append( np.array(y_std[target_start:target_end]) )
train_target_rv.append( np.array(y_rv[target_start:target_end]) )
train_input = np.array(train_input)
train_target_mean = np.array(train_target_mean)
train_target_std = np.array(train_target_std)
train_target_rv = np.array(train_target_rv)
train_input_scaled = (train_input - train_input.mean(axis=0)) / train_input.std(axis=0)
train_target_mean_std = np.stack(
[train_target_mean, train_target_std],
axis=2
)
#
#View a window of data
#
#Select window to plot
window_start = 500
window_end = window_start + lookBack
window_range = list(range(window_start, window_end))
window_range_target = list(range(window_start, window_end + lookAHead))
#Training data over window i
plt.plot(window_range, train_input[window_start, :],
color='tab:green', label=f'train_input[window #{window_start}] (y_rv)', zorder=3)
#Target over window i + lookAHead
plt.plot(window_range_target,
y_mean[window_start:window_end + lookAHead],
color='tab:red', linestyle=':',
label=f'y_mean[{window_start}:{window_start}+{lookBack}+{lookAHead}]')
plt.xlabel('index')
plt.ylabel('y')
#y std
plt.plot(window_range_target,
y_mean[window_start:window_end + lookAHead] +\
2 * y_std[window_start:window_end + lookAHead],
color='tab:purple', linewidth=3, label='±2*y_std')
plt.plot(window_range_target,
y_mean[window_start:window_end + lookAHead] -\
2 * y_std[window_start:window_end + lookAHead],
color='tab:purple', linewidth=3)
plt.ylabel('y')
plt.axvline(x=window_end, linestyle=':', linewidth=1, color='k', label='window ends')
plt.gcf().set_size_inches(7, 2.5)
plt.gcf().legend(bbox_to_anchor=(0.7, 1.1))
plt.margins(x=0)
plt.show()
#
#Model
#
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
#Define model
tf.random.set_seed(0)
tf.keras.backend.clear_session()
inputModel = layers.Input(shape=lookBack)
features = layers.Flatten()(inputModel)
features = layers.Dense(64, activation="relu")(features)
features = layers.Dropout(rate=0.2)(features)
features = layers.Dense(32, activation="relu")(features)
outputs = [layers.Dense(lookAHead, name='mean_out')(features),
layers.Dense(lookAHead, name='std_out')(features)
]
#Compile model
model = keras.Model(inputs=inputModel, outputs=outputs)
loss = tf.keras.losses.MeanSquaredError()
model.compile(optimizer='adam', loss=loss, loss_weights=[0.5, 0.5])
#Fit model
model.fit(x=train_input_scaled, y=[train_target_mean, train_target_std],
epochs=20, batch_size=32, validation_split=split)
#
# Plot input, targets, and predictions
#
#Get predictions
pred_mean, pred_std = [t.numpy() for t in model(train_input_scaled)]
#Plot
x_pred = list(range(lookBack, data_length - lookAHead))
plt.plot(x_pred, y_rv[x_pred], c='tab:brown', lw=2, alpha=0.4, label='input features')
plt.plot(x_pred, y_mean[x_pred], c='black', lw=5, label='target mean')
plt.plot(x_pred, y_mean[x_pred] + 2 * y_std[x_pred], c='black', lw=4, ls=':', label='target std')
plt.plot(x_pred, y_mean[x_pred] - 2 * y_std[x_pred], c='black', lw=4, ls=':')
plt.plot(x_pred, pred_mean[:, -1], c='tab:cyan', lw=0.3, label='predicted mean')
plt.plot(x_pred, pred_mean[:, -1] + 2 * pred_std[:, -1],
c='tab:cyan', lw=0.5, label='predicted std')
plt.plot(x_pred, pred_mean[:, -1] - 2 * pred_std[:, -1],
c='tab:cyan', lw=0.5)
plt.axvline(x=(1 - split) * len(train_input_scaled),
c='black', lw=1, ls=':', label='train-val split')
plt.xlabel('index')
plt.ylabel('y')
plt.title('Input, targets, and predictions')
plt.gcf().set_size_inches(5, 3)
plt.gcf().legend(ncol=2, bbox_to_anchor=(0.9, 1.25))
plt.show()
该网络在进行预测时仅查看输入特征,并且不使用其之前的预测。允许网络访问其之前的预测将使其成为一个循环网络,这是时间序列的通常选择。
GaussianProcessRegressor
中的 sklearn
直接对时间序列的均值和标准差进行建模,如果您想将神经网络与其他技术进行比较,可以提供良好的基线测量。