我有一个基于 keras.Model 类的自定义模型,我试图保存该模型以供以后使用。虽然保存模型可以正常工作,但重新加载同一模型会引发以下错误:
ValueError: Cannot assign value to variable ' dense_4/kernel:0': Shape mismatch.The variable shape (24, 512), and the assigned value shape (784, 256) are incompatible.
我正在使用 keras 文件格式以及相应的
model.save
和 keras.saving.load_model
方法。
我也尝试了pickle
和dill
,都未能保存模型。
class Encoder
之上的所有内容都是损失计算的数学,可以忽略(可能),但需要存在以实现功能。该模型是一种在 MNIST 数据上训练的变分自动编码器。
首先训练 BaseNet 类。 HigherNet 从经过训练的 BaseNet 接收编码器和解码器,并使用自己的生成器网络再次进行训练。此 HigherNet 已保存,这是我尝试再次加载的模型。除了在 HigherNet 中使用的部分之外,BaseNet 本身不再相关。
错误重现的工作示例:
import tensorflow as tf
import keras
from keras import layers
import numpy as np
import math
def silverman_rule_of_thumb_normal(N):
return tf.pow((4 / (3 * N)), 0.4)
def pairwise_distances(x, y=None):
if y is None:
y = x
distances_tf = tf.norm(x[:, None] - y, axis=-1) ** 2
return tf.cast(distances_tf, dtype=tf.float64)
def cw_normality(X, y=None):
assert len(X.shape) == 2
D = tf.cast(tf.shape(X)[1], tf.float64)
N = tf.cast(tf.shape(X)[0], tf.float64)
if y is None:
y = silverman_rule_of_thumb_normal(N)
# adjusts for dimensionality; D=2 -> K1=1, D>2 -> K1<1
K1 = 1.0 / (2.0 * D - 3.0)
A1 = pairwise_distances(X)
A = tf.reduce_mean(1 / tf.math.sqrt(y + K1 * A1))
B1 = tf.cast(tf.square(tf.math.reduce_euclidean_norm(X, axis=1)), dtype=tf.float64)
B = 2 * tf.reduce_mean((1 / tf.math.sqrt(y + 0.5 + K1 * B1)))
return (1 / tf.sqrt(1 + y)) + A - B
def phi_sampling(s, D):
return tf.pow(1.0 + 4.0 * s / (2.0 * D - 3), -0.5)
def cw_sampling_lcw(first_sample, second_sample, y):
shape = first_sample.get_shape().as_list()
dim = np.prod(shape[1:])
first_sample = tf.reshape(first_sample, [-1, dim])
shape = second_sample.get_shape().as_list()
dim = np.prod(shape[1:])
second_sample = tf.reshape(second_sample, [-1, dim])
assert len(first_sample.shape) == 2
assert first_sample.shape == second_sample.shape
_, D = first_sample.shape
T = 1.0 / (2.0 * tf.sqrt(math.pi * y))
A0 = pairwise_distances(first_sample)
A = tf.reduce_mean(phi_sampling(A0 / (4 * y), D))
B0 = pairwise_distances(second_sample)
B = tf.reduce_mean(phi_sampling(B0 / (4 * y), D))
C0 = pairwise_distances(first_sample, second_sample)
C = tf.reduce_mean(phi_sampling(C0 / (4 * y), D))
return T * (A + B - 2 * C)
def euclidean_norm_squared(X, axis=None):
return tf.reduce_sum(tf.square(X), axis=axis)
def cw_sampling_silverman(first_sample, second_sample):
stddev = tf.math.reduce_std(second_sample)
N = tf.cast(tf.shape(second_sample)[0], tf.float64)
gamma = silverman_rule_of_thumb_normal(N)
return cw_sampling_lcw(first_sample, second_sample, gamma)
@tf.keras.saving.register_keras_serializable()
class Encoder(keras.Model):
def __init__(self, args, **kwargs):
super().__init__(**kwargs)
self.activation = layers.Activation("relu")
self.flatten = layers.Flatten()
self.dense1 = layers.Dense(256)
self.dense2 = layers.Dense(args["latent_dim"], name="z")
def build(self, **kwargs):
encoder_inputs = keras.Input(shape=(28, 28, 1))
x = self.flatten(encoder_inputs)
x = self.dense1(x)
x = self.activation(x)
z = self.dense2(x)
encoder = keras.Model(encoder_inputs, [z], name="encoder")
return encoder
@tf.keras.saving.register_keras_serializable()
class Decoder(keras.Model):
def __init__(self, args, **kwargs):
super().__init__(**kwargs)
self.latent_dim = args["latent_dim"]
self.activation = layers.Activation("relu")
self.dense1 = layers.Dense(256)
self.dense2 = layers.Dense(28 * 28, activation="sigmoid")
self.reshape = layers.Reshape([28, 28, 1])
def build(self, **kwargs):
latent_inputs = keras.Input(shape=(self.latent_dim,))
x = self.dense1(latent_inputs)
x = self.activation(x)
x = self.dense2(x)
decoder_outputs = self.reshape(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="encoder")
return decoder
@tf.keras.saving.register_keras_serializable()
class Generator(keras.Model):
def __init__(self, args, **kwargs):
super().__init__(**kwargs)
self.noise_dim = args["noise_dim"]
self.activation = layers.Activation("relu")
self.dense1 = layers.Dense(512)
self.dense2 = layers.Dense(args["latent_dim"], name="z")
def build(self, **kwargs):
noise_inputs = keras.Input(shape=(self.noise_dim,))
x = self.dense1(noise_inputs)
x = self.activation(x)
z = self.dense2(x)
latent_generator = keras.Model(noise_inputs, [z], name="generator")
return latent_generator
@tf.keras.saving.register_keras_serializable()
class BaseNet(keras.Model):
def __init__(self, args, **kwargs):
super(BaseNet, self).__init__(**kwargs)
self.encoder = Encoder(args).build()
self.decoder = Decoder(args).build()
self.args = args
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(
name="cw_reconstruction_loss"
)
self.cw_loss_tracker = keras.metrics.Mean(name="cw_loss")
def get_config(self):
config = {
"args": self.args
}
base_config = super(BaseNet, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.cw_loss_tracker,
]
def train_step(self, data):
with tf.GradientTape() as tape:
z = self.encoder(data)
reconstruction = self.decoder(z)
# tf.print(reconstruction)
cw_reconstruction_loss = tf.math.log(
cw_sampling_silverman(data, reconstruction))
lambda_val = 1
cw_loss = lambda_val * tf.math.log(cw_normality(z))
total_loss = cw_reconstruction_loss + cw_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(cw_reconstruction_loss)
self.cw_loss_tracker.update_state(cw_loss)
return {
"total_loss": self.total_loss_tracker.result(),
"cw_reconstruction_loss": self.reconstruction_loss_tracker.result(),
"cw_loss": self.cw_loss_tracker.result(),
}
@tf.keras.saving.register_keras_serializable()
class HighNet(keras.Model):
def __init__(self, encoder, decoder, args, **kwargs):
super(HighNet, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.args = args
self.generator = Generator(args).build()
self.reconstruction_loss_tracker = keras.metrics.Mean(
name="cw_reconstruction_loss"
)
def get_config(self):
config = {
"encoder": self.encoder,
"decoder": self.decoder,
"args": self.args
}
base_config = super(HighNet, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs, **kwargs):
x = self.encoder(inputs)
return self.decoder(x)
@property
def metrics(self):
return [
self.reconstruction_loss_tracker
]
def train_step(self, data):
with tf.GradientTape() as tape:
z = self.encoder(data)
batch_size = tf.shape(z)[0]
noise_np = np.random.normal(0, 1, size=self.args["noise_dim"])
noise_tf = tf.expand_dims(tf.convert_to_tensor(noise_np), axis=0)
noise_tf = tf.repeat(noise_tf, repeats=batch_size, axis=0)
noise_z = self.generator(noise_tf)
# tf.print(reconstruction)
cw_reconstruction_loss = tf.math.log(
cw_sampling_silverman(z, noise_z))
grads = tape.gradient(cw_reconstruction_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.reconstruction_loss_tracker.update_state(cw_reconstruction_loss)
return {
"cw_reconstruction_loss": self.reconstruction_loss_tracker.result()
}
def test_saving():
args = {"sample_amount": 1000,
"latent_dim": 24,
"noise_dim": 24,
"epochs": 1,
"batch_size": 128,
"patience": 3,
"learning_rate": 0.0001}
(x_train, y_train), (x_test, _) = keras.datasets.mnist.load_data()
mnist_digits = np.concatenate([x_train, x_test], axis=0)[0:100]
mnist_digits = np.expand_dims(mnist_digits, -1).astype("float32") / 255
base_model = BaseNet(args)
base_model.compile(optimizer=keras.optimizers.Adam(learning_rate=args["learning_rate"]))
es_callback = keras.callbacks.EarlyStopping(monitor='total_loss', patience=args["patience"], mode="min")
base_model.fit(mnist_digits, epochs=args["epochs"], batch_size=args["batch_size"], callbacks=[es_callback])
model = HighNet(base_model.encoder, base_model.decoder, args)
model.compile(optimizer=keras.optimizers.Adam(learning_rate=args["learning_rate"]))
es2_callback = keras.callbacks.EarlyStopping(monitor='cw_reconstruction_loss', patience=args["patience"],
mode="min")
model.fit(mnist_digits, epochs=args["epochs"], batch_size=args["batch_size"], callbacks=[es2_callback])
model.save("high_model.keras", save_format="keras")
loaded_model = keras.saving.load_model("high_model.keras")
if __name__ == "__main__":
test_saving()
保存后模型.summary的输出:
Model: "encoder"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_1 (InputLayer) [(None, 28, 28, 1)] 0
flatten (Flatten) (None, 784) 0
dense (Dense) (None, 256) 200960
activation (Activation) (None, 256) 0
z (Dense) (None, 24) 6168
=================================================================
Total params: 207128 (809.09 KB)
Trainable params: 207128 (809.09 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
Model: "encoder"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_2 (InputLayer) [(None, 24)] 0
dense_1 (Dense) (None, 256) 6400
activation_1 (Activation) (None, 256) 0
dense_2 (Dense) (None, 784) 201488
reshape (Reshape) (None, 28, 28, 1) 0
=================================================================
Total params: 207888 (812.06 KB)
Trainable params: 207888 (812.06 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
Model: "generator"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_3 (InputLayer) [(None, 24)] 0
dense_3 (Dense) (None, 512) 12800
activation_2 (Activation) (None, 512) 0
z (Dense) (None, 24) 12312
=================================================================
Total params: 25112 (98.09 KB)
Trainable params: 25112 (98.09 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
我不知道为什么这个摘要看起来与我的原始摘要不同,但在原始摘要中我可以从“dense_4/kernel:0”得出形状不匹配发生在编码器的第一个密集层中。那里的层用它们的运行时标签进行注释,即dense_4等等。我不知道我省略了什么会导致不同风格的摘要,原始模型有更多层,批量归一化,重用激活函数等。 错误仍然是一样的。我无法想象简单地保存和加载模型会改变任何形状不兼容的情况。
我正在使用 Tensorflow 2.15、Keras 2.15、Python 3.11,并且正在 Ubuntu 23.10 上使用 PyCharm Professional。
当我尝试调试时,我将错误消息缩小到:
ValueError: Layer 'dense_3' expected 0 variables, but received 2 variables during loading. Expected: []
这很奇怪,因为没有 dend_3 层。我设法构建了以下最小示例:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Activation, Input, BatchNormalization, Dropout, Flatten, Identity
args = {"activation": "relu",
"batch_norm": True}
@keras.saving.register_keras_serializable()
class CustomModel1(Model):
def __init__(self):
super().__init__()
self.dense = Dense(32)
def call(self, inputs):
x = self.dense(inputs)
return x
@keras.saving.register_keras_serializable()
class CustomModel2(Model):
def __init__(self):
super().__init__()
self.dense = Dense(32)
def call(self, inputs):
x = self.dense(inputs)
return x
@keras.saving.register_keras_serializable()
class CustomModel3(Model):
def __init__(self):
super().__init__()
self.net1 = CustomModel1()
self.net2 = CustomModel2()
def call(self, inputs):
z = self.net1(inputs)
x = self.net2(z)
return z, x
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
# z, y_pred = self(x) # this fixes it instead
y_pred = self.net2(self.net1(x)) # this line throws the error
loss = self.compiled_loss(y, y_pred)
trainable_vars = self.trainable_weights
gradients = tape.gradient(loss, trainable_vars)
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
self.compiled_metrics.update_state(y, y_pred)
return {m.name: m.result() for m in self.metrics}
# Instantiate the model
model = CustomModel3()
# Compile the model
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# Create some dummy data for training
x_train = np.random.random((1000, 32))
y_train = np.random.randint(10, size=(1000,))
# Train the model for one epoch
model.fit(x_train, y_train, epochs=1)
# Save the model
model.save('custom_model.keras', save_format='keras')
# Load the model again
loaded_model = tf.keras.models.load_model('custom_model.keras')
# Generate some sample data for prediction
x_sample = np.random.random((10, 32)) # Assuming 10 samples with 32 features each
# Make predictions using the loaded model
predictions = loaded_model.predict(x_sample)
print(predictions)
# Print the predictions
print(model.summary())
通过 self.net1 和 self.net2 inside train_step 自行调用子模型会引发错误,但通过更高模型的 call 方法调用它们并返回各自的值不会引发错误。 我无法理解这如何导致此错误消息。