我正在尝试训练贝叶斯神经网络来进行噪声时间序列预测。我有以下问题
import numpy as np
import matplotlib.pyplot as plt
lookBack = 128
inputDim = 1
lookAHead = 32
split = 0.15
# create data
data_length = 10000
y_data = []
for ii in range(data_length):
y_data.append(0.5*ii + 2 + np.random.normal(0, ii*ii/100000))
x_data = np.arange(data_length)
#normalize data
y_data = np.array(y_data)
y_data = (y_data - np.mean(y_data)) / np.std(y_data)
plt.plot(x_data, y_data)
plt.show()
#construct input/target data
train_data_input = []
train_data_target = []
for ii in range(shape[0] - lookBack - lookAHead):
train_data_input.append(np.array(y_data[ii:ii+lookBack]))
train_data_target.append(y_data[ii+lookBack + lookAHead])
train_data_input = np.array(train_data_input)
train_data_input = np.expand_dims(train_data_input, axis = -1)
train_data_target = np.array(train_data_target)
我的模特:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_probability as tfp
from tensorflow.keras.layers import Dense, Input
inputModel = Input(shape=(lookBack, inputDim))
features = keras.layers.Flatten()(inputModel)
features = layers.Dense(32, activation = "tanh")(features)
features = tfp.layers.DenseVariational(
units=8,
make_prior_fn=prior,
make_posterior_fn=posterior,
kl_weight=1/(train_data_input.shape[0]*(1-split)),
activation="tanh",
)(features)
我尝试了不同的输出类型,只是回归值或学习到的分布:
if output_type == "val":
outputs = layers.Dense(1)(features)
if output_type == "dist":
distribution_params = Dense(2) (features)
#c = np.log(np.expm1(1.0))
#outputs = tfp.layers.DistributionLambda(lambda t: tfp.distributions.Normal(loc=t[..., :1],
# scale=1e-3 + tf.math.softplus(c+0.05 * t[..., 1:])))(distribution_params)
outputs = tfp.layers.IndependentNormal(1)(distribution_params)
model = keras.Model(inputs=inputModel, outputs=outputs)
def negative_loglikelihood(targets, estimated_distribution):
return -estimated_distribution.log_prob(targets)
loss = None
if output_type == "dist"
loss = negative_loglikelihood
#def loss(y_true, y_pred):
# return tf.keras.losses.KLD(y_true, y_pred)
if output_type == "val":
loss = tf.keras.losses.MeanSquaredError()
model.compile(
optimizer=optimizer,
loss=loss
)
model.fit(x = train_data_input, y = train_data_target, epochs=25, validation_split=split)
我尝试了不同的单位大小、损失函数(-log_pob、KLD)激活函数、(可学习的)先验(1/2,见下文)和后验(1/2,见下文),但我的模型并没有真正学习有用东西。此外,我预计输出的 stdDev 会随着 x 的增加而增加,因为噪声也会增加,但事实并非如此。
def prior1(kernel_size, bias_size, dtype=None):
n = kernel_size + bias_size
prior_model = keras.Sequential(
[
tfp.layers.DistributionLambda(
lambda t: tfp.distributions.MultivariateNormalDiag(
loc=tf.zeros(n), scale_diag=tf.ones(n).
)
)
]
)
return prior_model
def prior2(kernel_size, bias_size, dtype=None):
n = kernel_size + bias_size
c = np.log(np.expm1(1.0))
return tf.keras.Sequential([
tfp.layers.VariableLayer(2*n, dtype=dtype),
tfp.layers.DistributionLambda(lambda t: tfp.distributions.Independent(
tfp.distributions.Normal(loc=t[...,:n], scale=1e-3 + tf.math.softplus(c + 0.05 * t[..., n:])), reinterpreted_batch_ndims=1))
])
def posterior1(kernel_size, bias_size, dtype=None):
n = kernel_size + bias_size
posterior_model = keras.Sequential(
[
tfp.layers.VariableLayer(
tfp.layers.MultivariateNormalTriL.params_size(n), dtype=dtype
),
tfp.layers.MultivariateNormalTriL(n),
]
)
return posterior_model
def posterior2(kernel_size, bias_size=0, dtype=None):
n = kernel_size + bias_size
c = np.log(np.expm1(1.0))
return keras.Sequential([
tfp.layers.VariableLayer(2 * n, dtype=dtype),
tfp.layers.DistributionLambda(lambda t: tfp.distributions.Independent(
tfp.distributions.Normal(loc=t[..., :n],
scale=1e-5 + tf.nn.softplus(c + t[..., n:])),
)),
])
模型评估:
predictions = []
means = []
stds = []
for ii in range(20):
prediction = model(train_data_input)
if output_type == "val":
predictions.append(prediction)
if output_type == "dist":
means.append(prediction.mean())
stds.append(prediction.stddev())
if output_type == "val":
predictions = np.array(predictions)
predicted_mean = np.mean(predictions, axis = 0)
predicted_std = np.std(predictions, axis = 0)
if output_type == "dist":
predicted_mean = np.mean(np.array(means), axis = 0)
predicted_std = np.mean(np.array(stds), axis = 0)
lower_bound = predicted_mean - predicted_std
upper_bound = predicted_mean + predicted_std
plt.plot(x_data, y_data)
#Plot the mean
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(predicted_mean), color = "r")
#Plot the begging of the test set
plt.vlines(x=float(1-split)*predicted_mean.shape[0] + lookBack + lookAHead,ymin = np.min(predicted_mean), ymax = np.max(predicted_mean), colors = "g")
#Plot the hopefully increasing stdDev
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, predicted_std, color = "g")
#Plot mean +/- stdDev
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(lower_bound), color = "r", alpha = 0.5)
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(upper_bound), color = "r", alpha = 0.5)
我感谢任何有关如何让模型学习的建议。
在下面的代码中,根据每个时间点(
y_rv
和 y_mean
)的已知平均值和标准差生成随机输入信号 (y_std
)。该模型从输入信号中获取一个窗口,并学习预测用于对该信号进行采样的平均值和标准差(即输入为 y_rv
,目标为 y_mean
和 y_std
)。
我首先将模型简化为预测平均值的单输出回归模型。 ReLU 激活有助于稳定训练。一旦该基本模型开始工作,我就添加了标准差的第二个输出。
我修改了模型和目标,以便模型预测未来最多 32 个点的值序列,而不是简单地预测 t=32 时的单个向量。我认为这为模型在预测标准差时提供了更多背景信息,而不必依赖单个数据点。添加 dropout 还有助于模型稳定标准差的预测。直接估计标准差比估计效果更好
log(std)
。
显示训练集中的单个窗口:
目标和预测:
import numpy as np
import matplotlib.pyplot as plt
import random
random.seed(0)
np.random.seed(0)
lookBack = 128
lookAHead = 32
split = 0.15
# create data
data_length = 10_000
y_mean = np.empty(data_length)
y_std = np.empty_like(y_mean)
y_rv = np.empty_like(y_mean)
for ii in range(data_length):
mean = 0.5*ii + 2
std = ii**2 / 100_000
y_mean[ii] = mean
y_std[ii] = std
#Sample an rv from the Normal(mean, std) distribution
y_rv[ii] = np.random.normal(loc=mean, scale=std)
#construct input/target data
train_input = []
train_target_mean = []
train_target_std = []
train_target_rv = []
for ii in range(data_length - lookBack - lookAHead):
#For ii, train will use a lookBack-sized window starting at ii
train_input.append(np.array(y_rv[ii:ii + lookBack]))
#Use it to predict the value(s) that's lookAHead beyond the window end
target_start = ii + lookBack
target_end = target_start + lookAHead
train_target_mean.append( np.array(y_mean[target_start:target_end]) )
train_target_std.append( np.array(y_std[target_start:target_end]) )
train_target_rv.append( np.array(y_rv[target_start:target_end]) )
train_input = np.array(train_input)
train_target_mean = np.array(train_target_mean)
train_target_std = np.array(train_target_std)
train_target_rv = np.array(train_target_rv)
train_input_scaled = (train_input - train_input.mean(axis=0)) / train_input.std(axis=0)
train_target_mean_std = np.stack(
[train_target_mean, train_target_std],
axis=2
)
#
#View a window of data
#
#Select window to plot
window_start = 500
window_end = window_start + lookBack
window_range = list(range(window_start, window_end))
window_range_target = list(range(window_start, window_end + lookAHead))
#Training data over window i
plt.plot(window_range, train_input[window_start, :],
color='tab:green', label=f'train_input[window #{window_start}] (y_rv)', zorder=3)
#Target over window i + lookAHead
plt.plot(window_range_target,
y_mean[window_start:window_end + lookAHead],
color='tab:red', linestyle=':',
label=f'y_mean[{window_start}:{window_start}+{lookBack}+{lookAHead}]')
plt.xlabel('index')
plt.ylabel('y')
#y std
plt.plot(window_range_target,
y_mean[window_start:window_end + lookAHead] +\
2 * y_std[window_start:window_end + lookAHead],
color='tab:purple', linewidth=3, label='±2*y_std')
plt.plot(window_range_target,
y_mean[window_start:window_end + lookAHead] -\
2 * y_std[window_start:window_end + lookAHead],
color='tab:purple', linewidth=3)
plt.ylabel('y')
plt.axvline(x=window_end, linestyle=':', linewidth=1, color='k', label='window ends')
plt.gcf().set_size_inches(7, 2.5)
plt.gcf().legend(bbox_to_anchor=(0.7, 1.1))
plt.margins(x=0)
plt.show()
#
#Model
#
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
#Define model
tf.random.set_seed(0)
tf.keras.backend.clear_session()
inputModel = layers.Input(shape=lookBack)
features = layers.Flatten()(inputModel)
features = layers.Dense(64, activation="relu")(features)
features = layers.Dropout(rate=0.2)(features)
features = layers.Dense(32, activation="relu")(features)
outputs = [layers.Dense(lookAHead, name='mean_out')(features),
layers.Dense(lookAHead, name='std_out')(features)
]
#Compile model
model = keras.Model(inputs=inputModel, outputs=outputs)
loss = tf.keras.losses.MeanSquaredError()
model.compile(optimizer='adam', loss=loss, loss_weights=[0.5, 0.5])
#Fit model
model.fit(x=train_input_scaled, y=[train_target_mean, train_target_std],
epochs=20, batch_size=32, validation_split=split)
#
# Plot input, targets, and predictions
#
#Get predictions
pred_mean, pred_std = [t.numpy() for t in model(train_input_scaled)]
#Plot
x_pred = list(range(lookBack, data_length - lookAHead))
plt.plot(x_pred, y_rv[x_pred], c='tab:brown', lw=2, alpha=0.4, label='input features')
plt.plot(x_pred, y_mean[x_pred], c='black', lw=5, label='target mean')
plt.plot(x_pred, y_mean[x_pred] + 2 * y_std[x_pred], c='black', lw=4, ls=':', label='target std')
plt.plot(x_pred, y_mean[x_pred] - 2 * y_std[x_pred], c='black', lw=4, ls=':')
plt.plot(x_pred, pred_mean[:, -1], c='tab:cyan', lw=0.3, label='predicted mean')
plt.plot(x_pred, pred_mean[:, -1] + 2 * pred_std[:, -1],
c='tab:cyan', lw=0.5, label='predicted std')
plt.plot(x_pred, pred_mean[:, -1] - 2 * pred_std[:, -1],
c='tab:cyan', lw=0.5)
plt.axvline(x=(1 - split) * len(train_input_scaled),
c='black', lw=1, ls=':', label='train-val split')
plt.xlabel('index')
plt.ylabel('y')
plt.title('Input, targets, and predictions')
plt.gcf().set_size_inches(5, 3)
plt.gcf().legend(ncol=2, bbox_to_anchor=(0.9, 1.25))
plt.show()
该网络在进行预测时仅查看输入特征,并且不使用其之前的预测。允许网络访问其之前的预测将使其成为一个循环网络,这是时间序列的通常选择。
GaussianProcessRegressor
中的 sklearn
直接对时间序列的平均值和标准差进行建模,如果您想将神经网络与其他技术进行比较,可以提供良好的基线测量。
根据噪声信号估计均值和标准差(可用于创建训练集):
t_axis = np.arange(data_length).reshape(-1, 1)
plt.plot(t_axis, y_rv, linestyle='none', marker='.', alpha=0.8, label='noisy data')
plt.plot(y_mean, c='k', label='ground truth μ±2σ', lw=10, alpha=0.3)
plt.plot(y_mean - 2 * y_std, c='k', lw=10, alpha=0.3)
plt.plot(y_mean + 2 * y_std, c='k', lw=10, alpha=0.3)
#GPR estimate of mean/std
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process import kernels
kernel = kernels.DotProduct() + 0.1 * kernels.DotProduct()**2 * kernels.WhiteKernel()
gpr = GaussianProcessRegressor(
kernel=kernel, random_state=np.random.RandomState(0)
)
gpr.fit(t_axis, y_rv)
gpr_mean, gpr_sd = gpr.predict(t_axis, return_std=True)
plt.plot(t_axis, gpr_mean, c='tab:orange', lw=4, label='gaussian process')
plt.plot(t_axis, gpr_mean - 2 * gpr_sd, lw=4, c='tab:orange')
plt.plot(t_axis, gpr_mean + 2 * gpr_sd, lw=4, c='tab:orange')
#Rolling window estimate of mean/std
window_size = 30
df = pd.DataFrame(y_rv, columns=['Values'])
rolling_mean = df.Values.rolling(window=window_size, min_periods=1).mean()
rolling_sd = df.Values.rolling(window=window_size, min_periods=1).std()
rolling_sd.iloc[0] = rolling_sd.iloc[1]
plt.plot(rolling_mean, c='tab:red', lw=2, label='rolling window')
plt.plot(rolling_mean - 2 * rolling_sd, c='tab:red', lw=2)
plt.plot(rolling_mean + 2 * rolling_sd, c='tab:red', lw=2)
plt.suptitle('Estimating μ and σ from noisy data')
plt.title('Gaussian process vs. rolling window', fontsize=10)
plt.xlabel('sample index')
plt.ylabel('y')
plt.legend()
plt.xlim(400, 1000); plt.ylim(300, 1200)
plt.gcf().set_size_inches(5, 3.5)
plt.show()
使用
torch.distributions
来学习分布。
import numpy as np
import matplotlib.pyplot as plt
import random
random.seed(0)
np.random.seed(0)
look_back = 128
look_ahead = 32
split = 0.15
#
# Create data
#
data_length = 10_000
y_mean = np.empty(data_length)
y_sd = np.empty_like(y_mean)
y_rv = np.empty_like(y_mean)
for ii in range(data_length):
mean = ii + 2
sd = ii**2 / (data_length * 10)
y_mean[ii] = mean
y_sd[ii] = sd
#Sample an rv from the Normal(mean, std) distribution
#This will be the noisy input to the model
y_rv[ii] = np.random.normal(loc=mean, scale=sd)
#
# Construct input/target data
#
train_input = []
train_target_rv = []
for ii in range(data_length - look_back - look_ahead):
#For ii, train will use a lookBack-sized window starting at ii
train_input.append(np.array(y_rv[ii:ii + look_back]))
#Use it to predict the value(s) that's lookAHead beyond the window end
train_target_rv.append(y_rv[ii + look_back + look_ahead])
train_input = np.array(train_input)
train_target_rv = np.array(train_target_rv).reshape(-1, 1)
#
#View a window of data
#
#Select window to plot
window_num = 9000
window_end = window_num + look_back
window_range = list(range(window_num, window_end))
window_range_target = list(range(window_num, window_end + look_ahead))
#Training data over window i
plt.plot(window_range, train_input[window_num, :],
color='tab:green', label=f'input data window #{window_num}', zorder=3)
# over window i + lookAHead
plt.plot(window_range_target, y_mean[window_num:window_end + look_ahead],
color='tab:purple', linestyle='--', linewidth=3,
label=f'$\mu$[{window_num}:{window_num}+{look_back}+{look_ahead}]')
plt.xlabel('index')
plt.ylabel('y')
#y std
plt.plot(window_range_target,
y_mean[window_num:window_end + look_ahead] +\
2 * y_sd[window_num:window_end + look_ahead],
color='tab:purple', linewidth=3, label='±2$\sigma$')
plt.plot(window_range_target,
y_mean[window_num:window_end + look_ahead] -\
2 * y_sd[window_num:window_end + look_ahead],
color='tab:purple', linewidth=3)
plt.ylabel('y')
plt.axvline(x=window_end - 1, color='k', label=f'look_back={look_back} window ends')
plt.gcf().set_size_inches(7, 2.5)
plt.gcf().legend(bbox_to_anchor=(0.8, 1.2), ncol=2, fontsize=10)
plt.margins(x=0)
plt.show()
#
#Model
#
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
class Encoder(nn.Module):
def __init__(self):
super().__init__()
intermediate_dim = look_back // 2
self.feed_fwd = nn.Sequential(
nn.Linear(look_back, intermediate_dim),
nn.ReLU(),
nn.BatchNorm1d(intermediate_dim),
nn.Linear(intermediate_dim, intermediate_dim),
nn.ReLU(),
nn.BatchNorm1d(intermediate_dim)
)
self.mean_encoder = nn.Sequential(
nn.Linear(intermediate_dim, intermediate_dim),
nn.ReLU(),
nn.Linear(intermediate_dim, 1),
)
self.std_encoder = nn.Sequential(
nn.Linear(intermediate_dim, intermediate_dim),
nn.ReLU(),
nn.Linear(intermediate_dim, 1),
nn.Softplus()
)
def forward(self, x):
x = self.feed_fwd(x)
mu = self.mean_encoder(x)
sigma = self.std_encoder(x)
return torch.distributions.Normal(mu, sigma)
torch.manual_seed(0)
encoder = Encoder()
optimizer = optim.NAdam(encoder.parameters())
n_epochs = 150
batch_size = 32
X_t = torch.tensor(train_input).float()
y_t = torch.tensor(train_target_rv).float()
Xy_paired = list(zip(X_t, y_t))
dataloader = DataLoader(Xy_paired, batch_size=batch_size, shuffle=True)
losses = []
encoder.train()
for epoch in range(n_epochs):
running_loss = 0
for batch_num, (X, y) in enumerate(dataloader):
predicted_dist = encoder(X)
nll_loss = -predicted_dist.log_prob(y).mean(dim=0)
optimizer.zero_grad()
nll_loss.backward()
optimizer.step()
running_loss += nll_loss.item()
if not (epoch % 10): print(epoch, '{:.0f}'.format( running_loss / len(dataloader)))
losses.append(running_loss / len(dataloader))
plt.plot(losses)
plt.gcf().set_size_inches(5, 3)
plt.show()
#Get predictions
encoder.eval()
with torch.no_grad():
prediction = encoder(X_t)
pred_mean = prediction.mean.numpy()
pred_sd = prediction.stddev.numpy()
sample_from_prediction = prediction.sample()
#Plot
x_pred = list(range(look_back, data_length - look_ahead))
plt.plot(x_pred, y_rv[x_pred], c='tab:brown', lw=2, alpha=0.4, label='input data')
plt.plot(x_pred, y_mean[x_pred], c='black', lw=5, label='actual $\mu$')
plt.plot(x_pred, y_mean[x_pred] + 2 * y_sd[x_pred], c='black', lw=4, ls=':', label='actual 2$\sigma$')
plt.plot(x_pred, y_mean[x_pred] - 2 * y_sd[x_pred], c='black', lw=4, ls=':')
# plt.plot(x_pred, sample_from_prediction, c='tab:cyan', lw=0.3, label='sample rv from prediction')
plt.plot(x_pred, pred_mean, c='tab:cyan', lw=0.3, label='predicted $\mu$')
plt.plot(x_pred, pred_mean + 2 * pred_sd, c='tab:red', lw=0.5, label='predicted 2$\sigma$')
plt.plot(x_pred, pred_mean - 2 * pred_sd, c='tab:red', lw=0.5)
plt.xlabel('index')
plt.ylabel('y')
plt.title('Input, ground truth, and predictions')
plt.gcf().set_size_inches(5, 3)
plt.gcf().legend(ncol=2, bbox_to_anchor=(0.9, 1.25))
# plt.xlim(6_000, 10_000); plt.ylim(4_000, 12_000)
plt.show()