Dataflow Tensorflow Transform 将转换后的数据写入 BigQuery

问题描述 投票:0回答:1

在 GCP Dataflow 管道中,我尝试将转换组件中转换后的数据写入 Bigquery,但出现以下错误。首先,如果有人能让我知道是否有任何标准解决方案可以将 Transform 发出的转换后的数据写入 BigQuery,我将不胜感激。如果没有,请提出任何建议,应将哪些内容更正到我的代码中。谢谢。

 File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
    writer.write(row)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1400, in write
    return self._file_handle.write(self._coder.encode(row) + b'\n')
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1351, in encode
    default=default_encoder).encode('utf-8')
  File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 134, in default_encoder
    "Object of type '%s' is not JSON serializable" % type(obj).__name__)
TypeError: Object of type 'RecordBatch' is not JSON serializable [while running 'train - Write to BigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-128']

代码是:

def analyze_and_transform(raw_dataset, step):
    
    transformed_dataset, transform_fn = (
        raw_dataset 
        | '{} - Analyze & Transform'.format(step) >> tft_beam.AnalyzeAndTransformDataset(
            preprocess_fn, output_record_batches=True)
    )
    
    return transformed_dataset, transform_fn

def transform(raw_dataset, transform_fn, step):
    
    transformed_dataset = (
        (raw_dataset, transform_fn) 
        | '{} - Transform'.format(step) >> tft_beam.TransformDataset(output_record_batches=True)
    )
    
    return transformed_dataset

from tensorflow_metadata.proto.v0 import schema_pb2

def convert_schema_to_string(schema):
    schema_string = ''
    for feature in schema.feature:
        if feature.type==schema_pb2.FLOAT:
            feature_type = 'FLOAT'
        elif feature.type == schema_pb2.INT and feature.int_domain.is_categorical == True:
            feature_type = 'STRING'
        else:
            feature_type = 'INTEGER'
            
        schema_string += '{}:{}'.format(feature.name, feature_type)

        schema_string += ','
    schema_string = schema_string.rstrip(',')
    return schema_string


def write_to_bigquery(transformed_dataset, step):
    
    transformed_data, transformed_metadata = transformed_dataset
    schema_string = convert_schema_to_string(transformed_metadata.schema)
    
    transformed_data | '{} - Write to BigQuery'.format(step) >> beam.io.WriteToBigQuery(
        table=f'{PROJECT}.{OUT_DATASET_ID}.{OUT_TABLE_NAME}',
        schema=schema_string,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
    )

管道:

with beam.Pipeline(runner, options=pipeline_options) as pipeline:
    logging.info(f'pipeline_options: {pipeline_options}')
    logging.getLogger().setLevel(logging.INFO)
    
    print(pipeline_options)
    
    with tft_beam.Context(temporary_dir):
        
        # Preprocess train data
        step = 'train'
        # Read raw train data from BQ
        raw_train_dataset = read_from_bq(pipeline, step, data_size)
        # Analyze and transform raw_train_dataset 
        transformed_train_dataset, transform_fn = analyze_and_transform(raw_train_dataset, step)
        # transformed_train_data, transformed_train_metadata = transformed_train_dataset
        
        # Write transformed train data to sink as tfrecords
        write_tfrecords(transformed_train_dataset, transformed_data_location, step)
        # Write transformed train data to BigQuery

        write_to_bigquery(transformed_train_dataset, step)

[更新]:作为第一个测试,为了获得行字典输出,我设置了:

tft_beam.TransformDataset(output_record_batches=False)
,因为根据文档:“”....如果为真,AnalyzeAndTransformDataset 输出 pyarrow.RecordBatches;否则,输出实例字典。”。我收到另一个错误:

File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
    writer.write(row)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1400, in write
    return self._file_handle.write(self._coder.encode(row) + b'\n')
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1351, in encode
    default=default_encoder).encode('utf-8')
  File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 134, in default_encoder
    "Object of type '%s' is not JSON serializable" % type(obj).__name__)
TypeError: Object of type 'float32' is not JSON serializable [while running 'train - Write to BigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-110']
google-bigquery google-cloud-dataflow apache-beam tensorflow-transform
1个回答
0
投票

我通过将 BigQuery 请求的 dtypes 转换为 int 和 float 解决了这个问题。

© www.soinside.com 2019 - 2024. All rights reserved.