使用贝叶斯神经网络来学习训练数据中的噪声

问题描述 投票:0回答:1

我正在尝试训练贝叶斯神经网络来进行噪声时间序列预测。我有以下问题

  1. 让模型学习数据中的线性关系
  2. 让模型学习不断增加的噪音 目标是模型学习噪声,因此能够在学习后计算干扰内的不确定性。 数据:
import numpy as np
import matplotlib.pyplot as plt
lookBack = 128
inputDim = 1
lookAHead = 32
split = 0.15
# create data
data_length = 10000
y_data = []
for ii in range(data_length):
    y_data.append(0.5*ii + 2 + np.random.normal(0, ii*ii/100000))
x_data = np.arange(data_length)

#normalize data
y_data = np.array(y_data)
y_data = (y_data - np.mean(y_data)) / np.std(y_data)

plt.plot(x_data, y_data)
plt.show()

#construct input/target data
train_data_input = []
train_data_target = []
for ii in range(shape[0] - lookBack - lookAHead):
    train_data_input.append(np.array(y_data[ii:ii+lookBack]))
    train_data_target.append(y_data[ii+lookBack + lookAHead])

train_data_input = np.array(train_data_input)
train_data_input = np.expand_dims(train_data_input, axis = -1)

train_data_target = np.array(train_data_target)

我的模特:

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_probability as tfp
from tensorflow.keras.layers import Dense, Input

inputModel = Input(shape=(lookBack, inputDim))
features = keras.layers.Flatten()(inputModel)
features = layers.Dense(32, activation = "tanh")(features)
features = tfp.layers.DenseVariational(
            units=8,
            make_prior_fn=prior,
            make_posterior_fn=posterior,
            kl_weight=1/(train_data_input.shape[0]*(1-split)),
            activation="tanh",
            )(features)

我尝试了不同的输出类型,只是回归值或学习到的分布:

if output_type == "val":
    outputs = layers.Dense(1)(features)

if output_type == "dist":
    distribution_params = Dense(2) (features)
    #c = np.log(np.expm1(1.0))
    #outputs = tfp.layers.DistributionLambda(lambda t: tfp.distributions.Normal(loc=t[..., :1],
    #                       scale=1e-3 + tf.math.softplus(c+0.05 * t[..., 1:])))(distribution_params)
    outputs = tfp.layers.IndependentNormal(1)(distribution_params)


model = keras.Model(inputs=inputModel, outputs=outputs)

def negative_loglikelihood(targets, estimated_distribution):
    return -estimated_distribution.log_prob(targets)

loss = None
if output_type == "dist"
    loss = negative_loglikelihood 
    #def loss(y_true, y_pred):
    #    return tf.keras.losses.KLD(y_true, y_pred)
if output_type == "val":
    loss = tf.keras.losses.MeanSquaredError()

model.compile(
        optimizer=optimizer,
        loss=loss
    )

model.fit(x = train_data_input, y = train_data_target, epochs=25, validation_split=split)

我尝试了不同的单位大小、损失函数(-log_pob、KLD)激活函数、(可学习的)先验(1/2,见下文)和后验(1/2,见下文),但我的模型并没有真正学习有用东西。此外,我预计输出的 stdDev 会随着 x 的增加而增加,因为噪声也会增加,但事实并非如此。

def prior1(kernel_size, bias_size, dtype=None):
        n = kernel_size + bias_size
        prior_model = keras.Sequential(
            [
                tfp.layers.DistributionLambda(
                    lambda t: tfp.distributions.MultivariateNormalDiag(
                        loc=tf.zeros(n), scale_diag=tf.ones(n).
                    )
                )
            ]
        )
        return prior_model
    
    def prior2(kernel_size, bias_size, dtype=None):
        n = kernel_size + bias_size
        c = np.log(np.expm1(1.0))
        return tf.keras.Sequential([
            tfp.layers.VariableLayer(2*n, dtype=dtype),
            tfp.layers.DistributionLambda(lambda t: tfp.distributions.Independent(
                tfp.distributions.Normal(loc=t[...,:n], scale=1e-3 + tf.math.softplus(c + 0.05 * t[..., n:])), reinterpreted_batch_ndims=1))
        ])

def posterior1(kernel_size, bias_size, dtype=None):
    n = kernel_size + bias_size
    posterior_model = keras.Sequential(
        [
           tfp.layers.VariableLayer(
                tfp.layers.MultivariateNormalTriL.params_size(n), dtype=dtype
            ),
            tfp.layers.MultivariateNormalTriL(n),
        ]
    )
    return posterior_model
    
def posterior2(kernel_size, bias_size=0, dtype=None):
    n = kernel_size + bias_size
    c = np.log(np.expm1(1.0))
        
    return keras.Sequential([
        tfp.layers.VariableLayer(2 * n, dtype=dtype),
        tfp.layers.DistributionLambda(lambda t: tfp.distributions.Independent(
            tfp.distributions.Normal(loc=t[..., :n], 
                       scale=1e-5 + tf.nn.softplus(c + t[..., n:])),
            )),
    ])

模型评估:

predictions = []
means = []
stds = []
for ii in range(20):
    prediction = model(train_data_input)
    if output_type == "val":
        predictions.append(prediction)
    if output_type == "dist":
        means.append(prediction.mean())
        stds.append(prediction.stddev())
        
if output_type == "val":
    predictions = np.array(predictions)
    predicted_mean = np.mean(predictions, axis = 0)
    predicted_std = np.std(predictions, axis = 0)
        
if output_type == "dist":
    predicted_mean = np.mean(np.array(means), axis = 0)
    predicted_std = np.mean(np.array(stds), axis = 0)
    
lower_bound = predicted_mean - predicted_std
upper_bound = predicted_mean + predicted_std

plt.plot(x_data, y_data)
#Plot the mean
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(predicted_mean), color = "r")
#Plot the begging of the test set
plt.vlines(x=float(1-split)*predicted_mean.shape[0] + lookBack + lookAHead,ymin = np.min(predicted_mean), ymax = np.max(predicted_mean), colors = "g")
#Plot the hopefully increasing stdDev
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, predicted_std, color = "g")
#Plot mean +/- stdDev
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(lower_bound), color = "r", alpha = 0.5)
plt.plot(np.arange(predicted_mean.shape[0]) + lookBack + lookAHead, np.squeeze(upper_bound), color = "r", alpha = 0.5)

我感谢任何有关如何让模型学习的建议。

tensorflow bayesian confidence-interval tensorflow-probability bayesian-deep-learning
1个回答
0
投票

在下面的代码中,根据每个时间点(

y_rv
y_mean
)的已知平均值和标准差生成随机输入信号 (
y_std
)。该模型从输入信号中获取一个窗口,并学习预测用于对该信号进行采样的平均值和标准差(即输入为
y_rv
,目标为
y_mean
y_std
)。

我首先将模型简化为预测平均值的单输出回归模型。 ReLU 激活有助于稳定训练。一旦基本模型开始工作,我就添加了标准差的第二个输出。

我修改了模型和目标,以便模型预测未来最多 32 个点的值序列,而不是简单地预测 t=32 时的单个向量。我认为这为模型在预测标准差时提供了更多背景信息,而不必依赖单个数据点。添加 dropout 还有助于模型稳定标准差的预测。直接估计标准差比估计效果更好

log(std)

显示训练集中的单个窗口:

目标和预测:

import numpy as np
import matplotlib.pyplot as plt
import random

random.seed(0)
np.random.seed(0)

lookBack = 128
lookAHead = 32
split = 0.15

# create data
data_length = 10_000
y_mean = np.empty(data_length)
y_std = np.empty_like(y_mean)
y_rv = np.empty_like(y_mean)
for ii in range(data_length):
    mean = 0.5*ii + 2
    std = ii**2 / 100_000
    
    y_mean[ii] = mean
    y_std[ii] = std
    #Sample an rv from the Normal(mean, std) distribution
    y_rv[ii] = np.random.normal(loc=mean, scale=std)

#construct input/target data
train_input = []
train_target_mean = []
train_target_std = []
train_target_rv = []

for ii in range(data_length - lookBack - lookAHead):
    #For ii, train will use a lookBack-sized window starting at ii
    train_input.append(np.array(y_rv[ii:ii + lookBack]))
    
    #Use it to predict the value(s) that's lookAHead beyond the window end
    target_start = ii + lookBack
    target_end = target_start + lookAHead
    train_target_mean.append( np.array(y_mean[target_start:target_end]) )
    train_target_std.append( np.array(y_std[target_start:target_end]) )
    train_target_rv.append( np.array(y_rv[target_start:target_end]) )

train_input = np.array(train_input)
train_target_mean = np.array(train_target_mean)
train_target_std = np.array(train_target_std)
train_target_rv = np.array(train_target_rv)

train_input_scaled = (train_input - train_input.mean(axis=0)) / train_input.std(axis=0)

train_target_mean_std = np.stack(
    [train_target_mean, train_target_std],
    axis=2
)

#
#View a window of data
#
#Select window to plot
window_start = 500
window_end = window_start + lookBack

window_range = list(range(window_start, window_end))
window_range_target = list(range(window_start, window_end + lookAHead))

#Training data over window i
plt.plot(window_range, train_input[window_start, :],
         color='tab:green', label=f'train_input[window #{window_start}] (y_rv)', zorder=3)
#Target over window i + lookAHead
plt.plot(window_range_target,
         y_mean[window_start:window_end + lookAHead],
         color='tab:red', linestyle=':',
         label=f'y_mean[{window_start}:{window_start}+{lookBack}+{lookAHead}]')
plt.xlabel('index')
plt.ylabel('y')

#y std
plt.plot(window_range_target,
         y_mean[window_start:window_end + lookAHead] +\
         2 * y_std[window_start:window_end + lookAHead],
         color='tab:purple', linewidth=3, label='±2*y_std')
plt.plot(window_range_target,
         y_mean[window_start:window_end + lookAHead] -\
         2 * y_std[window_start:window_end + lookAHead],
         color='tab:purple', linewidth=3)
plt.ylabel('y')

plt.axvline(x=window_end, linestyle=':', linewidth=1, color='k', label='window ends')

plt.gcf().set_size_inches(7, 2.5)
plt.gcf().legend(bbox_to_anchor=(0.7, 1.1))
plt.margins(x=0)
plt.show()

#
#Model
#
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

#Define model
tf.random.set_seed(0)
tf.keras.backend.clear_session()

inputModel = layers.Input(shape=lookBack)
features = layers.Flatten()(inputModel)
features = layers.Dense(64, activation="relu")(features)
features = layers.Dropout(rate=0.2)(features)
features = layers.Dense(32, activation="relu")(features)
outputs = [layers.Dense(lookAHead, name='mean_out')(features),
           layers.Dense(lookAHead, name='std_out')(features)
           ]

#Compile model
model = keras.Model(inputs=inputModel, outputs=outputs)
loss = tf.keras.losses.MeanSquaredError()
model.compile(optimizer='adam', loss=loss, loss_weights=[0.5, 0.5])

#Fit model
model.fit(x=train_input_scaled, y=[train_target_mean, train_target_std],
          epochs=20, batch_size=32, validation_split=split)

#
# Plot input, targets, and predictions
#
#Get predictions
pred_mean, pred_std = [t.numpy() for t in model(train_input_scaled)]

#Plot
x_pred = list(range(lookBack, data_length - lookAHead))
plt.plot(x_pred, y_rv[x_pred], c='tab:brown', lw=2, alpha=0.4, label='input features')

plt.plot(x_pred, y_mean[x_pred], c='black', lw=5, label='target mean')
plt.plot(x_pred, y_mean[x_pred] + 2 * y_std[x_pred], c='black', lw=4, ls=':', label='target std')
plt.plot(x_pred, y_mean[x_pred] - 2 * y_std[x_pred], c='black', lw=4, ls=':')

plt.plot(x_pred, pred_mean[:, -1], c='tab:cyan', lw=0.3, label='predicted mean')

plt.plot(x_pred, pred_mean[:, -1] + 2 * pred_std[:, -1],
         c='tab:cyan', lw=0.5, label='predicted std')
plt.plot(x_pred, pred_mean[:, -1] - 2 * pred_std[:, -1],
         c='tab:cyan', lw=0.5)

plt.axvline(x=(1 - split) * len(train_input_scaled),
            c='black', lw=1, ls=':', label='train-val split')
plt.xlabel('index')
plt.ylabel('y')
plt.title('Input, targets, and predictions')
plt.gcf().set_size_inches(5, 3)
plt.gcf().legend(ncol=2, bbox_to_anchor=(0.9, 1.25))
plt.show()

该网络在进行预测时仅查看输入特征,并且不使用其之前的预测。允许网络访问其之前的预测将使其成为一个循环网络,这是时间序列的通常选择。

GaussianProcessRegressor
中的
sklearn
直接对时间序列的均值和标准差进行建模,如果您想将神经网络与其他技术进行比较,可以提供良好的基线测量。

© www.soinside.com 2019 - 2024. All rights reserved.