我是数据流初学者,使用这个通用脚本使用数据流将 csv 文件加载到 bigquery。
import argparse
import csv
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from google.cloud import bigquery
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--config', dest='config_file', required=True,
help='Path to the configuration file')
known_args, pipeline_args = parser.parse_known_args(argv)
# Parse the configuration file
config = parse_config(known_args.config_file)
# Create the pipeline options
options = PipelineOptions(pipeline_args)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = config['project_id']
google_cloud_options.region = config['region']
google_cloud_options.staging_location = config['staging_location']
google_cloud_options.temp_location = config['temp_location']
google_cloud_options.job_name = config['job_name']
options.view_as(StandardOptions).runner = config['runner']
# Create the pipeline
with beam.Pipeline(options=options) as p:
# Read the CSV file from GCS
lines = p | 'Read CSV from GCS' >> beam.io.ReadFromText(config['csv_file'])
# Write the rows to BigQuery
lines | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table=config['table'],
schema=config['schema'],
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
project=google_cloud_options.project,
dataset=config['dataset'],
)
def parse_config(config_file):
"""Parse the configuration file and return a dictionary of values."""
config = {}
with open(config_file) as f:
for line in f:
if line.startswith('#'):
continue
try:
key, value = line.strip().split('=', 1)
except ValueError:
print(f"Error parsing line: {line}")
raise
config[key.strip()] = value.strip()
return config
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
配置文件:
project_id = india-tvm-poc-de
region = us-central1
temp_location = gs://india-tvm-poc-de-123/dataflow_csv/
staging_location = gs://india-tvm-poc-de-123/dataflow_csv/
job_name = csvbqload-dataflow-2
runner = DataflowRunner
csv_file = gs://india-tvm-poc-de-123/dataflow_csv/sample_data.csv
schema = [{"name": "name", "type": "STRING", "mode": "REQUIRED"}, {"name": "email", "type": "STRING", "mode": "REQUIRED"}]
table = dataflow_load
dataset = sample_dataset
garima_singh@cloudshell:~$ cat config_file.txt
project_id = india-tvm-poc-de
region = us-central1
temp_location = gs://india-tvm-poc-de-123/dataflow_csv/
staging_location = gs://india-tvm-poc-de-123/dataflow_csv/
job_name = csvbqload-dataflow-2
runner = DataflowRunner
csv_file = gs://india-tvm-poc-de-123/dataflow_csv/sample_data.csv
schema = [{"name": "name", "type": "STRING", "mode": "REQUIRED"}, {"name": "email", "type": "STRING", "mode": "REQUIRED"}]
table = dataflow_load
dataset = sample_dataset`
简单的csv文件:
name,email
John,Doe
Jane,Smith
我遇到以下错误:
响应:<{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Fri, 05 May 2023 13:10:31 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '316', '-content-encoding': 'gzip'}>,内容<{ "error": { "code": 400, "message": "Invalid value for type: "NAME" is not a valid value", "errors": [ { "message": "Invalid value for type: "NAME" is not a valid value", "domain": "global", "reason": "invalid" } ], "status": "INVALID_ARGUMENT" } } >[跑步时 '写给 BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)-ptransform-93']
我尝试用简单的值更改 csv 文件,删除标头 int 值仍然出现错误,它是一个简单的脚本,不确定是什么问题 不知道为什么会出错。提前谢谢你
正如@GuillaumeBlaquiere 在他的评论中所说,如果您有一个小
CSV
文件,则无需使用Dataflow
,最好直接使用BigQuery Python
客户端。
但是,如果您出于任何原因想要或不得不留在
Dataflow
,我与您分享一个完整的解决方案,允许在CSV
中使用真正的Python
解析器读取Beam
文件。
我写的这门课允许在
CSV
中读取Beam
文件在Dict
中:
import codecs
from _csv import QUOTE_ALL
from typing import Iterable, Dict
import apache_beam as beam
from apache_beam import PCollection
from apache_beam.io import fileio
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems as beam_fs
class ReadCsvFiles(beam.PTransform):
def __init__(self,
file_pattern: str,
compression_type: CompressionTypes,
delimiter=',',
quotechar='"',
doublequote=True,
skipinitialspace=False,
lineterminator='\n',
quoting=QUOTE_ALL):
super().__init__()
self._file_pattern = file_pattern
self._compression_type = compression_type
self._delimiter = delimiter
self._quotechar = quotechar
self._doublequote = doublequote
self._skipinitialspace = skipinitialspace
self._lineterminator = lineterminator
self._quoting = quoting
def expand(self, pbegin: beam.pvalue.PBegin) -> PCollection[Dict[str, str]]:
return (
pbegin
| 'Match files' >> fileio.MatchFiles(self._file_pattern)
| 'Read CSV lines' >> beam.FlatMap(self._read_csv_lines_as_dicts)
)
def _get_csv_reader(self, result_file_as_iterator):
import csv
return csv.DictReader(
result_file_as_iterator,
delimiter=self._delimiter,
quotechar=self._quotechar,
doublequote=self._doublequote,
skipinitialspace=self._skipinitialspace,
lineterminator=self._lineterminator,
quoting=self._quoting)
def _read_csv_lines_as_dicts(self, readable_file_metadata) -> Iterable[Dict[str, str]]:
with beam_fs.open(readable_file_metadata.path, compression_type=CompressionTypes.UNCOMPRESSED) as f:
import gzip
if self._compression_type == CompressionTypes.UNCOMPRESSED:
for row in self._get_csv_reader(codecs.iterdecode(f, 'utf-8')):
yield dict(row)
else:
with gzip.open(f, "rt") as gzip_text_io_wrapper:
for row in self._get_csv_reader(gzip_text_io_wrapper):
yield dict(row)
此类还处理
csv
压缩为 gzip
文件。
这个类的用法示例:
result:PCollection[Dict] = (
p
| 'Read CSV files' >> ReadCsvFiles('gs://bucket/*.csv', CompressionTypes.UNCOMPRESSED)
| 'Map after' >> beam.Map(my_map_function))
...
我将
fileio.MatchFiles
与 Beam 一起使用,然后我使用 csv.DictReader
将 csv 行转换为 Dict
.
您可以在实例化
ReadCsvFiles
转换时传递所有 CSV 文件选项。