RuntimeError:方法要求处于跨副本上下文中,使用tf.distribute.MirroredStrategy时使用get_replica_context()。merge_call()

问题描述 投票:1回答:1

我正在尝试使用mirrored_strategy将模型更改为多个GPU。我可以在一个更简单的模型中复制我的问题https://colab.research.google.com/drive/16YlKuzdluryVRmcM680tjtLWfPjt5qhS

但是这是代码的重要部分;

def loss_object(target_y, pred_y):
    pred_ssum = tf.math.reduce_sum(tf.math.square(pred_y))
    target_ssum = tf.math.reduce_sum(tf.math.square(target_y))
    mul_sum = tf.math.reduce_sum(tf.math.multiply(pred_y, target_y))
    return tf.math.divide(-2 * mul_sum, tf.math.add(pred_ssum, target_ssum))

EPOCHS = 10



model = MyModel()

optimizer = tf.keras.optimizers.RMSprop(lr=2e-5)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

@tf.function
def distributed_train_step(images, labels):
    per_replica_losses = mirrored_strategy.experimental_run_v2(train_step, args=(images, labels,))
    return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                        axis=None)

@tf.function
def distributed_test_step(images, labels):
    return mirrored_strategy.experimental_run_v2(test_step, args=(images, labels,))

@tf.function
def train_step(images, labels):
    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = loss_object(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    #train_loss(loss)
    train_accuracy.update_state(labels, predictions)

@tf.function
def test_step(images, labels):
    predictions = model(images, training=False)
    t_loss = loss_object(labels, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(labels, predictions)

for epoch in range(EPOCHS):
# Reset the metrics at the start of the next epoch
    total_loss = 0.0
    num_batches = 0

    for images, labels in train_ds:
        #train_step(images, labels)
        total_loss += distributed_train_step(images, labels)
        num_batches += 1
    train_loss = total_loss/num_batches

    for test_images, test_labels in test_ds:
        #test_step(test_images, test_labels)
        distributed_test_step(test_images, test_labels)

    template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
    print(template.format(epoch+1, train_loss, train_accuracy.result()*100, test_loss.result(), test_accuracy.result()*100))

    train_accuracy.reset_states()
    test_loss.reset_states()
    test_accuracy.reset_states()

上面的所有代码都在mirrored_strategy.scope()中:该模型只是简单地使用具有恒定值的(4,4,4)多维数据集,并经过两个3D_CNN和3D_CNN_Transpose层以获取相同的(4,4,4)多维数据集作为输出。

但是,我收到一个错误的说法

RuntimeError                              Traceback (most recent call last)
<ipython-input-19-93fb783af116> in <module>()
     65         for images, labels in train_ds:
     66             #train_step(images, labels)
---> 67             total_loss += distributed_train_step(images, labels)
     68             num_batches += 1
     69         train_loss = total_loss/num_batches

8 frames
/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/func_graph.py in wrapper(*args, **kwargs)
    966           except Exception as e:  # pylint:disable=broad-except
    967             if hasattr(e, "ag_error_metadata"):
--> 968               raise e.ag_error_metadata.to_exception(e)
    969             else:
    970               raise

RuntimeError: in user code:

    <ipython-input-19-93fb783af116>:32 distributed_train_step  *
        per_replica_losses = mirrored_strategy.experimental_run_v2(train_step, args=(images, labels,))
    /usr/local/lib/python3.6/dist-packages/tensorflow/python/distribute/mirrored_strategy.py:770 _call_for_each_replica  *
        fn, args, kwargs)
    <ipython-input-19-93fb783af116>:43 train_step  *
        predictions = model(images, training=True)
    <ipython-input-14-cb5f0d1313e2>:9 call  *
        with mirrored_strategy.scope():
    /usr/local/lib/python3.6/dist-packages/tensorflow/python/distribute/distribute_lib.py:291 __enter__
        self._context.strategy.extended)
    /usr/local/lib/python3.6/dist-packages/tensorflow/python/distribute/distribute_lib.py:214 _require_cross_replica_or_default_context_extended
        raise RuntimeError("Method requires being in cross-replica context, use "

    RuntimeError: Method requires being in cross-replica context, use get_replica_context().merge_call()

有人遇到过类似的问题吗?如果有人给我一个建议,将不胜感激。

谢谢!保持安全。

python python-3.x tensorflow tensorflow2.0
1个回答
1
投票

根据讨论,该模型确实是导致此错误的原因。下面的更正是针对此问题的有效运行代码。

在您的数据集中将int的数据类型更改为float将防止将来出现TypeError

from __future__ import absolute_import, division, print_function, unicode_literals
!pip install tf-nightly
#%tensorflow_version 2.x
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras import datasets, layers, models, Model
import numpy as np

mirrored_strategy = tf.distribute.MirroredStrategy()

def train_gen():
    for i in range(10):
      yield tf.constant(i, shape=(4,4,4,1)), tf.constant(i, shape=(4,4,4,1))

def test_gen():
    for i in range(2):
      yield tf.constant(i+10, shape=(4,4,4,1)), tf.constant(i+10, shape=(4,4,4,1))

BATCH_SIZE_PER_REPLICA = 2
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * mirrored_strategy.num_replicas_in_sync

train_ds = tf.data.Dataset.from_generator(
    train_gen,
    output_types=(tf.float32, tf.float32),    # using float as your datatype
    output_shapes=((4,4,4,1), (4,4,4,1))
)

test_ds = tf.data.Dataset.from_generator(
    test_gen,
    output_types=(tf.float32, tf.float32),      # using float as your datatype
    output_shapes=((4,4,4,1), (4,4,4,1))
)

train_ds = train_ds.batch(GLOBAL_BATCH_SIZE)
test_ds = test_ds.batch(GLOBAL_BATCH_SIZE)

在您的模型上,mirrored_strategy.scope()导致您遇到错误。删除下面的代码即可解决此问题。

class MyModel(Model):
    def __init__(self):
        super(MyModel, self).__init__()
        #with mirrored_strategy.scope():
        self.cnn_down_1 = layers.Conv3D(1, (2, 2, 2), strides=2, padding='same')
        self.cnn_up_1 = layers.Conv3DTranspose(1, (2, 2, 2), strides=2, padding='same')

    def call(self, inputs):
         #with mirrored_strategy.scope():
            x = self.cnn_down_1(inputs)
            return self.cnn_up_1(x) 

assert tf.distribute.get_replica_context() is not None  # default

在下面的代码中,有必要在train_steptest_step功能之前删除<< @@ tf.function

with mirrored_strategy.scope(): #assert tf.distribute.get_replica_context() is not None # default loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction=tf.keras.losses.Reduction.NONE) def loss_object(target_y, pred_y): pred_ssum = tf.math.reduce_sum(tf.math.square(pred_y)) target_ssum = tf.math.reduce_sum(tf.math.square(target_y)) mul_sum = tf.math.reduce_sum(tf.math.multiply(pred_y, target_y)) return tf.math.divide(-2 * mul_sum, tf.math.add(pred_ssum, target_ssum)) EPOCHS = 10 model = MyModel() optimizer = tf.keras.optimizers.RMSprop(lr=2e-5) train_loss = tf.keras.metrics.Mean(name='train_loss') train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') test_loss = tf.keras.metrics.Mean(name='test_loss') test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy') #@tf.function def train_step(images, labels): with tf.GradientTape() as tape: predictions = model(images, training=True) loss = loss_object(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) #train_loss(loss) train_accuracy.update_state(labels, predictions) return loss #@tf.function def test_step(images, labels): predictions = model(images, training=False) t_loss = loss_object(labels, predictions) test_loss.update_state(t_loss) test_accuracy.update_state(labels, predictions) @tf.function def distributed_train_step(images, labels): assert tf.distribute.get_replica_context() is None per_replica_losses = mirrored_strategy.experimental_run_v2(train_step, args=(images, labels,)) return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) @tf.function def distributed_test_step(images, labels): return mirrored_strategy.experimental_run_v2(test_step, args=(images, labels,)) for epoch in range(EPOCHS): # Reset the metrics at the start of the next epoch #train_loss.reset_states() total_loss = 0.0 num_batches = 0 for images, labels in train_ds: #train_step(images, labels) total_loss += distributed_train_step(images, labels) num_batches += 1 train_loss = total_loss/num_batches for test_images, test_labels in test_ds: #test_step(test_images, test_labels) distributed_test_step(test_images, test_labels) template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}' print(template.format(epoch+1, train_loss, train_accuracy.result()*100, test_loss.result(), test_accuracy.result()*100)) train_accuracy.reset_states() test_loss.reset_states() test_accuracy.reset_states()
这样可以解决问题并正确消除了错误。现在正在工作。希望这能解决问题。
© www.soinside.com 2019 - 2024. All rights reserved.