数据流 BigQuery 到 BigQuery

问题描述 投票:0回答:3

我正在尝试创建一个从 BigQuery 返回到 BigQuery 的数据流脚本。我们的主表很大,破坏了提取功能。我想创建一个包含所有相关信息的简单表(作为查询的结果)。

SQL 查询

'Select * from table.orders where paid = false limit 10'
是确保其有效的简单查询。真正的查询更复杂,但连接到同一项目中的多个表。

这似乎可行,但我想知道我可以做些什么来测试它? 另外,我怎样才能让它每天早上自动运行?

import logging
import argparse
import apache_beam as beam

PROJECT='experimental'
BUCKET='temp1/python2'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=test1',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
    ]

    with beam.Pipeline(argv=argv) as p:

        # Read the table rows into a PCollection.
        rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
                query =  'Select * from `table.orders` where paid = false limit 10', 
                use_standard_sql=True))

        # Write the output using a "Write" transform that has side effects.
        rows  | 'Write' >> beam.io.WriteToBigQuery(
                table='orders_test',
                dataset='external',
                project='experimental',
                schema='field1:type1,field2:type2,field3:type3',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
python google-bigquery google-cloud-dataflow apache-beam
3个回答
0
投票

每天运行:https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions

测试——你可以尝试运行一个较小的数据集来测试它。如果您正在运行用户代码(不仅仅是读/写),您可以使用文件中的数据进行测试并检查预期结果。但是由于您只是在进行读/写,因此您需要使用 bigquery 进行测试。


0
投票

您可以使用 AIRFLOW 安排每天早上跑步。 您只需要 DataFlowPythonOperator 即可执行存储在

.py
文件中的数据流管道。

例如,在脚本中提供您的工作数据流管道

my_dataflow_pipe.py

import argparse
import apache_beam as beam


def run():
    parser = argparse.ArgumentParser(description='Pipeline BGQ2BGQ')
    parser.add_argument('--job_name', required=True, type=str)
    parser.add_argument('--query', required=True, type=str)
    parser.add_argument('--project', required=True, type=str)
    parser.add_argument('--region', required=True, type=str)
    parser.add_argument('--dataset', required=True, type=str)
    parser.add_argument('--table', required=True, type=str)
    parser.add_argument('--network', required=True, type=str)
    parser.add_argument('--subnetwork', required=True, type=str)
    parser.add_argument('--machine_type', required=True, type=str)
    parser.add_argument('--max_num_workers', required=True, type=int)
    parser.add_argument('--num_workers', required=True, type=int)
    parser.add_argument('--temp_location', required=True, type=str)
    parser.add_argument('--runner', required=True, type=str)
    parser.add_argument('--labels', required=True, type=str)

    opts = parser.parse_args()
    query = opts.query.replace("\n", " ")
    argv = [
        f"--job_name={opts.job_name}", 
        f"--project={opts.project}", f"--region={opts.region}", 
        f"--network={opts.network}", f"--subnetwork={opts.subnetwork}",
        f"--num_workers={opts.num_workers}", f"--max_num_workers={opts.max_num_workers}",
        f"--runner={opts.runner}", f"--temp_location={opts.temp_location}", 
        f"--machine_type={opts.machine_type}", f"--labels={opts.labels}", 
    ]
        
    with beam.Pipeline(argv=argv) as p:

        rows = p | 'read' >> beam.io.Read(
                beam.io.ReadFromBigQuery(query=query, use_standard_sql=True)
        )

        rows  | 'Write' >> beam.io.WriteToBigQuery(
                table=f'{opts.project}:{opts.dataset}.{opts.table}',
                schema='field1:type1,field2:type2,field3:type3',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
    
    
if __name__ == '__main__':
    run()

您可以构建您的 AIRFLOW dag 来触发数据流管道的执行:

import datetime
from airflow import models
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}
template_searchpath="/home/airflow/gcs/data/"

with models.DAG(
        'MYDAGNAME',
        catchup=False,
        default_args=default_args,
        template_searchpath=template_searchpath,
        start_date=datetime.datetime.now() - datetime.timedelta(days=3),
        schedule_interval='0 4 * * *',  # every day at 04:00 AM UTC
) as dag:
        
    job_name = f"MYJOB-{datetime.datetime.now().strftime('%Y%m%d%H%M')}"
    query = "SELECT field1, field2, field3 FROM MYPROJECT.XXX.xxx"
    dataflow_pyjob = DataFlowPythonOperator(
        task_id="dataflow_pyjob",
        job_name=job_name,
        py_file=template_searchpath+"my_dataflow_pipe.py",
        gcp_conn_id='MY_GCP_CONN_ID',
        options={
            'job_name':job_name, 'query':query, 
            'project':'MYPROJECT', 'region':'MYREGION',
            'dataset':'MYDATASET', 'table':'MYTAB', 
            'network':'MYNET', 'subnetwork':'MYSUBNET',
            'machine_type':'MYMACHTYPE', 
            'max_num_workers':'MYMNW', 'num_workers':'MYNW',
            'runner':'DataflowRunner', 'tempLocation':'MYTMPLOC',
        },
        wait_until_finished=True,
    )

其中

options
参数包含所有需要并提供给
my_dataflow_pipe.py
的参数(labels 由气流自动填充)。


0
投票

我没有足够的声誉发表评论,但已接受答案的链接已损坏。更新后的网址是:

https://cloud.google.com/blog/products/data-analytics/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions

© www.soinside.com 2019 - 2024. All rights reserved.