apache 将行发送到 tfrecord 以便生成统计数据

问题描述 投票:0回答:1

我已经构建了一个管道,可以读取一些数据,进行一些操作并创建一些 apache beam Row 对象(下面代码中的步骤 1 和 2)。然后我想生成统计数据并将它们写入文件。我可以利用tensorflow数据验证库来实现这一点,但tfdvGenerateStatistics需要一个

pyarrow.lib.RecordBatch
而不是
Row
对象。我知道
apache_beam.io.tfrecordio.WriteToTFRecord
可以将 PCollection 写入文件为 TFRecord,但是,有没有办法在不写入文件的情况下完成此操作?理想情况下,第 3 步会将
Row
对象转换为
TFRecord

with beam.Pipeline(options=pipeline_options) as pipeline:
    result = ( pipeline  
             | 'Step 1: Read data' >> ...
             | 'Step 2: Do some transformations' >> ...  # Here the resulting objects are beam Rows
             | 'Step 3: transform Rows to TFRecord' >> ...
             | 'Step 4: GenerateStatistics' >> tfdv.GenerateStatistics()
             | 'Step 5: WriteStatsOutput' >> WriteStatisticsToTFRecord(STATS_OUT_FILE)
            )

如果没有步骤 3,我的管道将生成以下错误:

apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GenerateStatistics: expected <class 'pyarrow.lib.RecordBatch'>, got Row(attribute_1=Any, attribute_2=Any, ..., attribute_n=Any)
Full type hint:
IOTypeHints[inputs=((<class 'pyarrow.lib.RecordBatch'>,), {}), outputs=((<class 'tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList'>,), {})]
strip_pcoll_output()

更新: 我一直致力于尝试在上面的管道中实现

Step 3
GenerateStatistics
将 pyarrow RecordBatch 对象作为输入,因此我尝试根据之前步骤中获得的字典来构建它。我可以得出以下结论:

class DictToPyarrow(beam.DoFn):
    def __init__(self, schema: pyarrow.schema, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.schema = schema

    def process(self, pydict):
        dict_list = dict()
        for key, val in pydict.items():
            dict_list[key] = [val]
        table = pyarrow.Table.from_pydict(dict_list, schema=self.schema)
        batches = table.to_batches()
        yield batches[0]

with beam.Pipeline(options=pipeline_options) as pipeline:
    result = ( pipeline  
             | 'Step 1: Read data' >> ...
             | 'Step 2: Do some transformations' >> ...  # Here the resulting objects are beam Rows
             | "STEP 3: To pyarrow" >> beam.ParDo(DictToPyarrow(schema))
             | 'Step 4: GenerateStatistics' >> tfdv.GenerateStatistics()
             | 'Step 5: WriteStatsOutput' >> WriteStatisticsToTFRecord(STATS_OUT_FILE)
            )
 

但是我收到以下错误:

TypeError: Expected feature column to be a (Large)List<primitive|struct> or null, but feature my_feature_1 was int32. [while running 'GenerateStatistics/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/ToTopKTuples']

更新2:我能够从梁管道创建一个tfxExampleGen对象,感谢this。类似的东西

with beam.Pipeline(options=pipeline_options) as pipeline:
    result = ( pipeline  
             | 'Step 1: Read data' >> beam.io.ReadFromParquet(gsc_input_file)
             | 'Step2: ToTFExample' >> beam.Map(utils.dict_to_example)
             | "Step3: GenerateStatistics" >> GenerateStatistics(tfdv_options)
)

但是我收到以下错误

apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GenerateStatistics: expected <class 'pyarrow.lib.RecordBatch'>, got <class 'tensorflow.core.example.example_pb2.Example'>
Full type hint:
IOTypeHints[inputs=((<class 'pyarrow.lib.RecordBatch'>,), {}), outputs=((<class 'tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList'>,), {})]
strip_pcoll_output()

看来

from tensorflow_data_validation import GenerateStatistics
想要
RecordBatch

apache-beam apache-beam-io tensorflow-transform tensorflow-data-validation
1个回答
1
投票

不幸的是,除了写入文件之外,没有其他方法可以做到这一点。我们需要执行此过程,因为机器学习框架将训练数据作为示例序列来使用,这种用于训练 ML 的文件格式应该具有易于使用的布局,并且与用于读/写文件的存储平台或编程语言没有阻抗不匹配。

使用

TFRecords
的一些优点是:

  • 训练模型的最佳性能。
  • Binay 数据占用的磁盘空间更少,复制和复制所需的时间也更少 可以更有效地从磁盘读取。
  • 它使得组合多个数据集并集成变得更加容易 与数据导入和预处理功能无缝结合 图书馆提供的数据集,特别是对于太 大。

您可以查看更多关于 TFRecord 的文档。

© www.soinside.com 2019 - 2024. All rights reserved.