我尝试优化数据输入管道。数据集是GCS上托管的450个TFRecord文件的集合,每个文件的大小约为70MB。该作业是使用GCP ML引擎执行的。没有GPU。
这里是管道:
def build_dataset(file_pattern):
return tf.data.Dataset.list_files(
file_pattern
).interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
buffer_size=2048
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).map(
map_func=_parse_example_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).prefetch(
buffer_size=1
)
具有映射功能:
def _bit_to_float(string_batch: tf.Tensor):
return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
), tf.float32), 2), (tf.shape(string_batch)[0], -1))
def _parse_example_batch(example_batch):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = _bit_to_float(samples["booleanFeatures"])
return (
tf.concat([dense_float, bits_to_float], 1),
tf.reshape(samples["label"], (-1, 1))
)
我试图遵循data pipeline tutorial的最佳实践,并对我的映射函数进行矢量化处理(如mrry的建议)。
使用此设置,虽然高速下载数据(带宽约为200MB / s),但CPU使用率不足(14%),并且训练非常慢(一个历时超过1小时)。
我尝试了一些参数配置,更改了interleave()
自变量,例如num_parallel_calls
或cycle_length
或TFRecordDataset
自变量,例如num_parallel_calls
。
最快的配置使用这组参数:
interleave.num_parallel_calls
:1interleave.cycle_length
:8TFRecordDataset.num_parallel_calls
:8有了这个,一个纪元只需要20分钟即可运行。 但是,CPU使用率仅为50%,而带宽消耗约为55MB / s
tf.data.experimental.AUTOTUNE
找不到最佳值来加快训练速度?种类,亚历克西斯。
经过更多的实验,我得出以下解决方案。
interleave
大于0,则删除TFRecordDataset
已经处理的num_parallel_calls
步骤。>>parse_example
和decode_raw
,返回元组`((,),())cache
之后的[map
_bit_to_float
功能作为模型的一部分移动最后,这是数据管道代码:
def build_dataset(file_pattern): return tf.data.TFRecordDataset( tf.data.Dataset.list_files(file_pattern), num_parallel_reads=multiprocessing.cpu_count(), buffer_size=70*1000*1000 ).shuffle( buffer_size=2048 ).map( map_func=split, num_parallel_calls=tf.data.experimental.AUTOTUNE ).batch( batch_size=2048, drop_remainder=True, ).cache( ).repeat( ).prefetch( buffer_size=32 ) def split(example): preprocessed_sample_columns = { "features": tf.io.VarLenFeature(tf.float32), "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""), "label": tf.io.FixedLenFeature((), tf.float32, -1) } samples = tf.io.parse_single_example(example, preprocessed_sample_columns) dense_float = tf.sparse.to_dense(samples["features"]) bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8) return ( (dense_float, bits_to_float), tf.reshape(samples["label"], (1,)) ) def build_model(input_shape): feature = keras.Input(shape=(N,)) bool_feature = keras.Input(shape=(M,), dtype="uint8") one_hot = dataset._bit_to_float(bool_feature) dense_input = tf.reshape( keras.backend.concatenate([feature, one_hot], 1), input_shape) output = actual_model(dense_input) model = keras.Model([feature, bool_feature], output) return model def _bit_to_float(string_batch: tf.Tensor): return tf.dtypes.cast(tf.reshape( tf.bitwise.bitwise_and( tf.bitwise.right_shift( tf.expand_dims(string_batch, 2), tf.reshape( tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8) ), ), tf.constant(0x01, dtype=tf.uint8) ), (tf.shape(string_batch)[0], -1) ), tf.float32)
感谢所有这些优化:
因此,这似乎是一个不错的开始设置。但是CPU和BW仍未过度使用,因此仍然欢迎任何建议!
因此,经过一些基准测试之后,我发现了我认为是我们最好的输入管道:
def build_dataset(file_pattern): tf.data.Dataset.list_files( file_pattern ).interleave( TFRecordDataset, cycle_length=tf.data.experimental.AUTOTUNE, num_parallel_calls=tf.data.experimental.AUTOTUNE ).shuffle( 2048 ).batch( batch_size=64, drop_remainder=True, ).map( map_func=parse_examples_batch, num_parallel_calls=tf.data.experimental.AUTOTUNE ).cache( ).prefetch( tf.data.experimental.AUTOTUNE ) def parse_examples_batch(examples): preprocessed_sample_columns = { "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True), "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""), "label": tf.io.FixedLenFeature((), tf.float32, -1) } samples = tf.io.parse_example(examples, preprocessed_sample_columns) bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8) return ( (samples['features'], bits_to_float), tf.expand_dims(samples["label"], 1) )
所以,新功能:
TFRecordDataset
交织是传统的,所以interleave
功能更好。batch
之前的[map
是一个好习惯(vectorizing your function),并减少了调用映射函数的次数。repeat
。从TF2.0开始,Keras模型API支持数据集API,并且可以使用缓存(请参见SO post)VarLenFeature
切换到FixedLenSequenceFeature
,删除对tf.sparse.to_dense
的无用调用。希望这会有所帮助。仍然欢迎您提出建议。
我尝试优化数据输入管道。数据集是GCS上托管的450个TFRecord文件的集合,每个文件的大小约为70MB。该作业是使用GCP ML引擎执行的。没有GPU。这是管道:...
为了社区的利益,在答案部分中提到@AlexisBRENON的解决方案和重要观察。