如何在 GCP 数据流中使用 python 管道代码读取 BigQuery 表

问题描述 投票:0回答:3

有人可以分享语法以在用 python 为 GCP 数据流编写的管道中读/写 bigquery 表

python google-cloud-dataflow google-cloud-platform
3个回答
9
投票

在数据流上运行

首先,构建一个具有以下选项的

Pipeline
以使其在GCP DataFlow上运行:

import apache_beam as beam

options = {'project': <project>,
           'runner': 'DataflowRunner',
           'region': <region>,
           'setup_file': <setup.py file>}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)

从 BigQuery 读取

用您的查询定义一个

BigQuerySource
并使用
beam.io.Read
从BQ读取数据:

BQ_source = beam.io.BigQuerySource(query = <query>)
BQ_data = pipeline | beam.io.Read(BQ_source)

写入 BigQuery

写入bigquery有两种选择:

  • 使用

    BigQuerySink
    beam.io.Write

    BQ_sink = beam.io.BigQuerySink(<table>, dataset=<dataset>, project=<project>)
    BQ_data | beam.io.Write(BQ_sink)
    
  • 使用

    beam.io.WriteToBigQuery

    BQ_data | beam.io.WriteToBigQuery(<table>, dataset=<dataset>, project=<project>)
    

3
投票

从 Bigquery 读取

rows = (p | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))

写入 Bigquery

rows | 'writeToBQ' >> beam.io.Write(
beam.io.BigQuerySink('{}:{}.{}'.format(PROJECT, BQ_DATASET_ID, BQ_TEST), schema='CONVERSATION:STRING, LEAD_ID:INTEGER', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

0
投票
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider

# Set the necessary pipeline options, such as project and job name
pipeline_options = PipelineOptions(
    project='your-project-id',
    job_name='dataflow-job',
    staging_location='gs://your-bucket/staging',
    temp_location='gs://your-bucket/temp',
    runner='DataflowRunner'
)

# Create a pipeline object using the options
p = beam.Pipeline(options=pipeline_options)

# Define a function to read data from BigQuery
def read_from_bigquery():
    return (p
            | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
                query='SELECT * FROM `your-project-id.your-dataset.source_table`',
                use_standard_sql=True)
            )

# Define a function to write data to BigQuery
def write_to_bigquery(data):
    return (data
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table='your-project-id.your-dataset.target_table',
                schema='column_1:string,column_2:integer,column_3:boolean',
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
            )

# Define your data processing logic
data = (p
        | 'Read Data' >> beam.Create(['dummy'])  # Create a dummy input element
        | 'Trigger Read' >> beam.FlatMap(lambda x: read_from_bigquery())
        | 'Process Data' >> beam.Map(lambda row: (row['column_1'], row['column_2'], row['column_3']))
        )

# Write the processed data to BigQuery
write_to_bigquery(data)

# Run the pipeline
p.run()

© www.soinside.com 2019 - 2024. All rights reserved.