将`tf.data.dataset`中的数据分发给多个工作者(如Horovod)

问题描述 投票:1回答:1

使用Horovod,你基本上可以运行N个独立的实例(所以它是一种形式的 图间复制),它们通过特殊的Horovod ops(基本上是广播+减少)进行通信。

现在让我们假设实例0,或者其他外部实例加载你的数据(通过 tf.data.Dataset). 你将如何分配 iterator.get_next() 到每个实例?使用Horovod广播会很低效,因为你会把所有的数据复制到所有的实例中。

在每个实例中拥有数据集,并在那里进行所有的加载,然后使用 shard 在数据集上加载数据也是低效的,因为你会到处加载数据,然后扔掉其中的(N-1)N个.所以这就是为什么也不希望有sharding,而是让数据集只在一个单一的(producerdataset worker)实例中加载,然后将批次分布在所有的火车工人上。

我想,TF MultiDeviceIterator 提供了一些类似的功能(或者基本上就是这样),但我不确定它是否能和Horovod一起工作,以及你将如何设置它?

或者你可以通过TF worker来进行分发 (指南? (也许你就是这样配置 MultiDeviceIterator 如果可能的话,应该通过TensorFlow操作函数来实现(有很多相关函数可能已经给了我这个功能,但我可能不知道,或者是误解了它们的工作原理)。)

如果可能的话,应该通过TensorFlow操作函数来实现(有很多相关的函数可能已经给了我这个功能,但我可能不知道,或者是误解了它的工作原理).或者答案可能是TensorFlow还没有提供任何这样的功能?(这对了解情况还是很有用的。那么我会用C++实现自己的解决方案,包装成TensorFlow。但在这样做之前,最好能知道是否真的有必要这样做)。)

(相关的还有 这个Horovod问题.)

(这个问题其实比Horovod更通用一些,虽然Horovod可能是一个很好的例子。对于分布式TensorFlow来说,你可能总是会遇到这个问题?)

(我收集了所有分布式TensorFlow的术语和方面的概述。此处,主要是为了澄清)。)

相关的还有(也许?这个, 这个, 这个, 这个这个 问题)。)

tensorflow tensorflow-datasets horovod
1个回答
2
投票

如你所说,复制每个实例中的数据,并为每个实例进行数据的sharding是不切实际的。

那么,一个解决方案就是将数据过程中的数据分开,让每个实例从数据过程中拉取数据,如下图所示。例如,这种通信可以使用队列建立。

在这样的系统中,数据过程将加载数据集,将其预处理成批次,并将批次推送到队列中。然后,每个训练实例将从这个队列中提取批次。例如,您可以将队列作为生成器传递到数据集 API 中(见 tf.data.Dataset.from_generator.).另外,如果批次的生产速度不够快,可以创建更多的数据进程来提高批次的吞吐量。

根据你的用例,具体的实现方式会有所不同。如需了解更多信息,您可以查找 网络和进程间通信多处理管道和队列.

                                                             Training        
                                                         +--------------+  ++
                                                         |              |   |
                                                    +----+  Instance 1  |   |
                                                    |    |              |   |
                                                    |    +--------------+   |
                                                    |                       |
                      Preprocessing                 |                       |
                  +--------------------+            +---->      X           |
                  |                    |            |                       |
             Load |                    | Batches    +           X           |
    Dataset+------>    Data Process    +--------->Queue                     |  N instances
                  |                    |            +           X           |  Distributed training
                  |                    |            |                       |  For example, using
                  +--------------------+            +---->      X           |  Horovod broadcast + reduce
                                                    |                       |
                                                    |        Training       |
                                                    |    +--------------+   |
                                                    |    |              |   |
                                                    +----+  Instance N  |   |
                                                         |              |   |
                                                         +--------------+  ++

对于一个tensorflow的实现,你可以使用 tf.data.Dataset.shardtf.data.TFRecordDataset.

该文档解决了您对使用TFRecords效率低下的担忧。

重要的注意事项。

  • 在使用任何随机化操作符(如shuffle)之前,一定要先进行shard操作。

  • 一般来说,最好是在数据集流水线的早期使用shard操作符。例如,当从一组TFRecord文件中读取时,在将数据集转换为输入样本之前进行shard。这样可以避免在每个worker上读取每个文件。下面是一个在完整流水线中高效分片策略的例子。

d = Dataset.list_files(pattern)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
© www.soinside.com 2019 - 2024. All rights reserved.