我正在尝试使用mirrored_strategy将模型更改为多个GPU。我可以在一个更简单的模型中复制我的问题https://colab.research.google.com/drive/16YlKuzdluryVRmcM680tjtLWfPjt5qhS
但是这是代码的重要部分;
def loss_object(target_y, pred_y):
pred_ssum = tf.math.reduce_sum(tf.math.square(pred_y))
target_ssum = tf.math.reduce_sum(tf.math.square(target_y))
mul_sum = tf.math.reduce_sum(tf.math.multiply(pred_y, target_y))
return tf.math.divide(-2 * mul_sum, tf.math.add(pred_ssum, target_ssum))
EPOCHS = 10
model = MyModel()
optimizer = tf.keras.optimizers.RMSprop(lr=2e-5)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')
@tf.function
def distributed_train_step(images, labels):
per_replica_losses = mirrored_strategy.experimental_run_v2(train_step, args=(images, labels,))
return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(images, labels):
return mirrored_strategy.experimental_run_v2(test_step, args=(images, labels,))
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_object(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
#train_loss(loss)
train_accuracy.update_state(labels, predictions)
@tf.function
def test_step(images, labels):
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
for epoch in range(EPOCHS):
# Reset the metrics at the start of the next epoch
total_loss = 0.0
num_batches = 0
for images, labels in train_ds:
#train_step(images, labels)
total_loss += distributed_train_step(images, labels)
num_batches += 1
train_loss = total_loss/num_batches
for test_images, test_labels in test_ds:
#test_step(test_images, test_labels)
distributed_test_step(test_images, test_labels)
template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
print(template.format(epoch+1, train_loss, train_accuracy.result()*100, test_loss.result(), test_accuracy.result()*100))
train_accuracy.reset_states()
test_loss.reset_states()
test_accuracy.reset_states()
上面的所有代码都在mirrored_strategy.scope()中:该模型只是简单地使用具有恒定值的(4,4,4)多维数据集,并经过两个3D_CNN和3D_CNN_Transpose层以获取相同的(4,4,4)多维数据集作为输出。
但是,我收到一个错误的说法
RuntimeError Traceback (most recent call last)
<ipython-input-19-93fb783af116> in <module>()
65 for images, labels in train_ds:
66 #train_step(images, labels)
---> 67 total_loss += distributed_train_step(images, labels)
68 num_batches += 1
69 train_loss = total_loss/num_batches
8 frames
/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/func_graph.py in wrapper(*args, **kwargs)
966 except Exception as e: # pylint:disable=broad-except
967 if hasattr(e, "ag_error_metadata"):
--> 968 raise e.ag_error_metadata.to_exception(e)
969 else:
970 raise
RuntimeError: in user code:
<ipython-input-19-93fb783af116>:32 distributed_train_step *
per_replica_losses = mirrored_strategy.experimental_run_v2(train_step, args=(images, labels,))
/usr/local/lib/python3.6/dist-packages/tensorflow/python/distribute/mirrored_strategy.py:770 _call_for_each_replica *
fn, args, kwargs)
<ipython-input-19-93fb783af116>:43 train_step *
predictions = model(images, training=True)
<ipython-input-14-cb5f0d1313e2>:9 call *
with mirrored_strategy.scope():
/usr/local/lib/python3.6/dist-packages/tensorflow/python/distribute/distribute_lib.py:291 __enter__
self._context.strategy.extended)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/distribute/distribute_lib.py:214 _require_cross_replica_or_default_context_extended
raise RuntimeError("Method requires being in cross-replica context, use "
RuntimeError: Method requires being in cross-replica context, use get_replica_context().merge_call()
有人遇到过类似的问题吗?如果有人给我一个建议,将不胜感激。
谢谢!保持安全。
根据讨论,该模型确实是导致此错误的原因。下面的更正是针对此问题的有效运行代码。
在您的数据集中将int的数据类型更改为float将防止将来出现TypeError。
from __future__ import absolute_import, division, print_function, unicode_literals
!pip install tf-nightly
#%tensorflow_version 2.x
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, models, Model
import numpy as np
mirrored_strategy = tf.distribute.MirroredStrategy()
def train_gen():
for i in range(10):
yield tf.constant(i, shape=(4,4,4,1)), tf.constant(i, shape=(4,4,4,1))
def test_gen():
for i in range(2):
yield tf.constant(i+10, shape=(4,4,4,1)), tf.constant(i+10, shape=(4,4,4,1))
BATCH_SIZE_PER_REPLICA = 2
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * mirrored_strategy.num_replicas_in_sync
train_ds = tf.data.Dataset.from_generator(
train_gen,
output_types=(tf.float32, tf.float32), # using float as your datatype
output_shapes=((4,4,4,1), (4,4,4,1))
)
test_ds = tf.data.Dataset.from_generator(
test_gen,
output_types=(tf.float32, tf.float32), # using float as your datatype
output_shapes=((4,4,4,1), (4,4,4,1))
)
train_ds = train_ds.batch(GLOBAL_BATCH_SIZE)
test_ds = test_ds.batch(GLOBAL_BATCH_SIZE)
在您的模型上,mirrored_strategy.scope()导致您遇到错误。删除下面的代码即可解决此问题。
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
#with mirrored_strategy.scope():
self.cnn_down_1 = layers.Conv3D(1, (2, 2, 2), strides=2, padding='same')
self.cnn_up_1 = layers.Conv3DTranspose(1, (2, 2, 2), strides=2, padding='same')
def call(self, inputs):
#with mirrored_strategy.scope():
x = self.cnn_down_1(inputs)
return self.cnn_up_1(x)
assert tf.distribute.get_replica_context() is not None # default
在下面的代码中,有必要在train_step
和test_step
功能之前删除<< @@ tf.function。
with mirrored_strategy.scope():
#assert tf.distribute.get_replica_context() is not None # default
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def loss_object(target_y, pred_y):
pred_ssum = tf.math.reduce_sum(tf.math.square(pred_y))
target_ssum = tf.math.reduce_sum(tf.math.square(target_y))
mul_sum = tf.math.reduce_sum(tf.math.multiply(pred_y, target_y))
return tf.math.divide(-2 * mul_sum, tf.math.add(pred_ssum, target_ssum))
EPOCHS = 10
model = MyModel()
optimizer = tf.keras.optimizers.RMSprop(lr=2e-5)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')
#@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_object(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
#train_loss(loss)
train_accuracy.update_state(labels, predictions)
return loss
#@tf.function
def test_step(images, labels):
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
@tf.function
def distributed_train_step(images, labels):
assert tf.distribute.get_replica_context() is None
per_replica_losses = mirrored_strategy.experimental_run_v2(train_step, args=(images, labels,))
return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(images, labels):
return mirrored_strategy.experimental_run_v2(test_step, args=(images, labels,))
for epoch in range(EPOCHS):
# Reset the metrics at the start of the next epoch
#train_loss.reset_states()
total_loss = 0.0
num_batches = 0
for images, labels in train_ds:
#train_step(images, labels)
total_loss += distributed_train_step(images, labels)
num_batches += 1
train_loss = total_loss/num_batches
for test_images, test_labels in test_ds:
#test_step(test_images, test_labels)
distributed_test_step(test_images, test_labels)
template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
print(template.format(epoch+1, train_loss, train_accuracy.result()*100, test_loss.result(), test_accuracy.result()*100))
train_accuracy.reset_states()
test_loss.reset_states()
test_accuracy.reset_states()
这样可以解决问题并正确消除了错误。现在正在工作。希望这能解决问题。