在 GCP Dataflow 管道中,我尝试将转换组件中转换后的数据写入 Bigquery,但出现以下错误。首先,如果有人能让我知道是否有任何标准解决方案可以将 Transform 发出的转换后的数据写入 BigQuery,我将不胜感激。如果没有,请提出任何建议,应将哪些内容更正到我的代码中。谢谢。
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
writer.write(row)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1400, in write
return self._file_handle.write(self._coder.encode(row) + b'\n')
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1351, in encode
default=default_encoder).encode('utf-8')
File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 134, in default_encoder
"Object of type '%s' is not JSON serializable" % type(obj).__name__)
TypeError: Object of type 'RecordBatch' is not JSON serializable [while running 'train - Write to BigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-128']
代码是:
def analyze_and_transform(raw_dataset, step):
transformed_dataset, transform_fn = (
raw_dataset
| '{} - Analyze & Transform'.format(step) >> tft_beam.AnalyzeAndTransformDataset(
preprocess_fn, output_record_batches=True)
)
return transformed_dataset, transform_fn
def transform(raw_dataset, transform_fn, step):
transformed_dataset = (
(raw_dataset, transform_fn)
| '{} - Transform'.format(step) >> tft_beam.TransformDataset(output_record_batches=True)
)
return transformed_dataset
from tensorflow_metadata.proto.v0 import schema_pb2
def convert_schema_to_string(schema):
schema_string = ''
for feature in schema.feature:
if feature.type==schema_pb2.FLOAT:
feature_type = 'FLOAT'
elif feature.type == schema_pb2.INT and feature.int_domain.is_categorical == True:
feature_type = 'STRING'
else:
feature_type = 'INTEGER'
schema_string += '{}:{}'.format(feature.name, feature_type)
schema_string += ','
schema_string = schema_string.rstrip(',')
return schema_string
def write_to_bigquery(transformed_dataset, step):
transformed_data, transformed_metadata = transformed_dataset
schema_string = convert_schema_to_string(transformed_metadata.schema)
transformed_data | '{} - Write to BigQuery'.format(step) >> beam.io.WriteToBigQuery(
table=f'{PROJECT}.{OUT_DATASET_ID}.{OUT_TABLE_NAME}',
schema=schema_string,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
管道:
with beam.Pipeline(runner, options=pipeline_options) as pipeline:
logging.info(f'pipeline_options: {pipeline_options}')
logging.getLogger().setLevel(logging.INFO)
print(pipeline_options)
with tft_beam.Context(temporary_dir):
# Preprocess train data
step = 'train'
# Read raw train data from BQ
raw_train_dataset = read_from_bq(pipeline, step, data_size)
# Analyze and transform raw_train_dataset
transformed_train_dataset, transform_fn = analyze_and_transform(raw_train_dataset, step)
# transformed_train_data, transformed_train_metadata = transformed_train_dataset
# Write transformed train data to sink as tfrecords
write_tfrecords(transformed_train_dataset, transformed_data_location, step)
# Write transformed train data to BigQuery
write_to_bigquery(transformed_train_dataset, step)
[更新]:作为第一个测试,为了获得行字典输出,我设置了:
tft_beam.TransformDataset(output_record_batches=False)
,因为根据文档:“”....如果为真,AnalyzeAndTransformDataset 输出 pyarrow.RecordBatches;否则,输出实例字典。”。我收到另一个错误:
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
writer.write(row)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1400, in write
return self._file_handle.write(self._coder.encode(row) + b'\n')
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1351, in encode
default=default_encoder).encode('utf-8')
File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 134, in default_encoder
"Object of type '%s' is not JSON serializable" % type(obj).__name__)
TypeError: Object of type 'float32' is not JSON serializable [while running 'train - Write to BigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-110']
我通过将 BigQuery 请求的 dtypes 转换为 int 和 float 解决了这个问题。