用于将 csv 文件加载到 bigquery 的 python 脚本

问题描述 投票:0回答:1

我是数据流初学者,使用这个通用脚本使用数据流将 csv 文件加载到 bigquery。

import argparse
import csv
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from google.cloud import bigquery


def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--config', dest='config_file', required=True,
                        help='Path to the configuration file')
    known_args, pipeline_args = parser.parse_known_args(argv)

    # Parse the configuration file
    config = parse_config(known_args.config_file)

    # Create the pipeline options
    options = PipelineOptions(pipeline_args)
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = config['project_id']
    google_cloud_options.region = config['region']
    google_cloud_options.staging_location = config['staging_location']
    google_cloud_options.temp_location = config['temp_location']
    google_cloud_options.job_name = config['job_name']
    options.view_as(StandardOptions).runner = config['runner']

    # Create the pipeline
    with beam.Pipeline(options=options) as p:
        # Read the CSV file from GCS
        lines = p | 'Read CSV from GCS' >> beam.io.ReadFromText(config['csv_file'])

        # Write the rows to BigQuery
        lines | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            table=config['table'],
            schema=config['schema'],
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            project=google_cloud_options.project,
            dataset=config['dataset'],
        )


def parse_config(config_file):
    """Parse the configuration file and return a dictionary of values."""
    config = {}
    with open(config_file) as f:
        for line in f:
            if line.startswith('#'):
                continue
            try:
                key, value = line.strip().split('=', 1)
            except ValueError:
                print(f"Error parsing line: {line}")
                raise
            config[key.strip()] = value.strip()
    return config


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

配置文件:

project_id = india-tvm-poc-de
region = us-central1
temp_location = gs://india-tvm-poc-de-123/dataflow_csv/
staging_location = gs://india-tvm-poc-de-123/dataflow_csv/
job_name = csvbqload-dataflow-2
runner = DataflowRunner
csv_file = gs://india-tvm-poc-de-123/dataflow_csv/sample_data.csv
schema = [{"name": "name", "type": "STRING", "mode": "REQUIRED"}, {"name": "email", "type": "STRING", "mode": "REQUIRED"}]
table = dataflow_load
dataset = sample_dataset
garima_singh@cloudshell:~$ cat config_file.txt
project_id = india-tvm-poc-de
region = us-central1
temp_location = gs://india-tvm-poc-de-123/dataflow_csv/
staging_location = gs://india-tvm-poc-de-123/dataflow_csv/
job_name = csvbqload-dataflow-2
runner = DataflowRunner
csv_file = gs://india-tvm-poc-de-123/dataflow_csv/sample_data.csv
schema = [{"name": "name", "type": "STRING", "mode": "REQUIRED"}, {"name": "email", "type": "STRING", "mode": "REQUIRED"}]
table = dataflow_load
dataset = sample_dataset`

简单的csv文件:

name,email
John,Doe
Jane,Smith

我遇到以下错误:

响应:<{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Fri, 05 May 2023 13:10:31 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '316', '-content-encoding': 'gzip'}>,内容<{ "error": { "code": 400, "message": "Invalid value for type: "NAME" is not a valid value", "errors": [ { "message": "Invalid value for type: "NAME" is not a valid value", "domain": "global", "reason": "invalid" } ], "status": "INVALID_ARGUMENT" } } >[跑步时 '写给 BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)-ptransform-93']

我尝试用简单的值更改 csv 文件,删除标头 int 值仍然出现错误,它是一个简单的脚本,不确定是什么问题 不知道为什么会出错。提前谢谢你

python google-cloud-platform google-cloud-dataflow apache-beam
1个回答
0
投票

正如@GuillaumeBlaquiere 在他的评论中所说,如果您有一个小

CSV
文件,则无需使用
Dataflow
,最好直接使用BigQuery
Python
客户端。

但是,如果您出于任何原因想要或不得不留在

Dataflow
,我与您分享一个完整的解决方案,允许在
CSV
中使用真正的
Python
解析器读取
Beam
文件。

我写的这门课允许在

CSV
中读取
Beam
文件在
Dict
中:

import codecs
from _csv import QUOTE_ALL
from typing import Iterable, Dict

import apache_beam as beam
from apache_beam import PCollection
from apache_beam.io import fileio
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems as beam_fs


class ReadCsvFiles(beam.PTransform):

    def __init__(self,
                 file_pattern: str,
                 compression_type: CompressionTypes,
                 delimiter=',',
                 quotechar='"',
                 doublequote=True,
                 skipinitialspace=False,
                 lineterminator='\n',
                 quoting=QUOTE_ALL):
        super().__init__()
        self._file_pattern = file_pattern
        self._compression_type = compression_type
        self._delimiter = delimiter
        self._quotechar = quotechar
        self._doublequote = doublequote
        self._skipinitialspace = skipinitialspace
        self._lineterminator = lineterminator
        self._quoting = quoting

    def expand(self, pbegin: beam.pvalue.PBegin) -> PCollection[Dict[str, str]]:
        return (
                pbegin
                | 'Match files' >> fileio.MatchFiles(self._file_pattern)
                | 'Read CSV lines' >> beam.FlatMap(self._read_csv_lines_as_dicts)
        )

    def _get_csv_reader(self, result_file_as_iterator):
        import csv
        return csv.DictReader(
            result_file_as_iterator,
            delimiter=self._delimiter,
            quotechar=self._quotechar,
            doublequote=self._doublequote,
            skipinitialspace=self._skipinitialspace,
            lineterminator=self._lineterminator,
            quoting=self._quoting)

    def _read_csv_lines_as_dicts(self, readable_file_metadata) -> Iterable[Dict[str, str]]:
        
        with beam_fs.open(readable_file_metadata.path, compression_type=CompressionTypes.UNCOMPRESSED) as f:
            import gzip
            if self._compression_type == CompressionTypes.UNCOMPRESSED:
                for row in self._get_csv_reader(codecs.iterdecode(f, 'utf-8')):
                    yield dict(row)
            else:
                with gzip.open(f, "rt") as gzip_text_io_wrapper:
                    for row in self._get_csv_reader(gzip_text_io_wrapper):
                        yield dict(row)

此类还处理

csv
压缩为
gzip
文件。

这个类的用法示例:

result:PCollection[Dict] = (
        p
        | 'Read CSV files' >> ReadCsvFiles('gs://bucket/*.csv', CompressionTypes.UNCOMPRESSED)
        | 'Map after' >> beam.Map(my_map_function))
...

我将

fileio.MatchFiles
与 Beam 一起使用,然后我使用
csv.DictReader
将 csv 行转换为
Dict
.

您可以在实例化

ReadCsvFiles
转换时传递所有 CSV 文件选项。

© www.soinside.com 2019 - 2024. All rights reserved.