我已经构建了一个管道,可以读取一些数据,进行一些操作并创建一些 apache beam Row 对象(下面代码中的步骤 1 和 2)。然后我想生成统计数据并将它们写入文件。我可以利用tensorflow数据验证库来实现这一点,但tfdvGenerateStatistics需要一个
pyarrow.lib.RecordBatch
而不是Row
对象。我知道 apache_beam.io.tfrecordio.WriteToTFRecord
可以将 PCollection 写入文件为 TFRecord,但是,有没有办法在不写入文件的情况下完成此操作?理想情况下,第 3 步会将 Row
对象转换为 TFRecord
。
with beam.Pipeline(options=pipeline_options) as pipeline:
result = ( pipeline
| 'Step 1: Read data' >> ...
| 'Step 2: Do some transformations' >> ... # Here the resulting objects are beam Rows
| 'Step 3: transform Rows to TFRecord' >> ...
| 'Step 4: GenerateStatistics' >> tfdv.GenerateStatistics()
| 'Step 5: WriteStatsOutput' >> WriteStatisticsToTFRecord(STATS_OUT_FILE)
)
如果没有步骤 3,我的管道将生成以下错误:
apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GenerateStatistics: expected <class 'pyarrow.lib.RecordBatch'>, got Row(attribute_1=Any, attribute_2=Any, ..., attribute_n=Any)
Full type hint:
IOTypeHints[inputs=((<class 'pyarrow.lib.RecordBatch'>,), {}), outputs=((<class 'tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList'>,), {})]
strip_pcoll_output()
更新: 我一直致力于尝试在上面的管道中实现
Step 3
。 GenerateStatistics
将 pyarrow RecordBatch 对象作为输入,因此我尝试根据之前步骤中获得的字典来构建它。我可以得出以下结论:
class DictToPyarrow(beam.DoFn):
def __init__(self, schema: pyarrow.schema, *unused_args, **unused_kwargs):
super().__init__(*unused_args, **unused_kwargs)
self.schema = schema
def process(self, pydict):
dict_list = dict()
for key, val in pydict.items():
dict_list[key] = [val]
table = pyarrow.Table.from_pydict(dict_list, schema=self.schema)
batches = table.to_batches()
yield batches[0]
with beam.Pipeline(options=pipeline_options) as pipeline:
result = ( pipeline
| 'Step 1: Read data' >> ...
| 'Step 2: Do some transformations' >> ... # Here the resulting objects are beam Rows
| "STEP 3: To pyarrow" >> beam.ParDo(DictToPyarrow(schema))
| 'Step 4: GenerateStatistics' >> tfdv.GenerateStatistics()
| 'Step 5: WriteStatsOutput' >> WriteStatisticsToTFRecord(STATS_OUT_FILE)
)
但是我收到以下错误:
TypeError: Expected feature column to be a (Large)List<primitive|struct> or null, but feature my_feature_1 was int32. [while running 'GenerateStatistics/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/ToTopKTuples']
更新2:我能够从梁管道创建一个tfxExampleGen对象,感谢this。类似的东西
with beam.Pipeline(options=pipeline_options) as pipeline:
result = ( pipeline
| 'Step 1: Read data' >> beam.io.ReadFromParquet(gsc_input_file)
| 'Step2: ToTFExample' >> beam.Map(utils.dict_to_example)
| "Step3: GenerateStatistics" >> GenerateStatistics(tfdv_options)
)
但是我收到以下错误
apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GenerateStatistics: expected <class 'pyarrow.lib.RecordBatch'>, got <class 'tensorflow.core.example.example_pb2.Example'>
Full type hint:
IOTypeHints[inputs=((<class 'pyarrow.lib.RecordBatch'>,), {}), outputs=((<class 'tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList'>,), {})]
strip_pcoll_output()
看来
from tensorflow_data_validation import GenerateStatistics
想要RecordBatch
。
不幸的是,除了写入文件之外,没有其他方法可以做到这一点。我们需要执行此过程,因为机器学习框架将训练数据作为示例序列来使用,这种用于训练 ML 的文件格式应该具有易于使用的布局,并且与用于读/写文件的存储平台或编程语言没有阻抗不匹配。
使用
TFRecords
的一些优点是:
您可以查看更多关于 TFRecord 的文档。