我正在尝试创建一个从 BigQuery 返回到 BigQuery 的数据流脚本。我们的主表很大,破坏了提取功能。我想创建一个包含所有相关信息的简单表(作为查询的结果)。
SQL 查询
'Select * from table.orders where paid = false limit 10'
是确保其有效的简单查询。真正的查询更复杂,但连接到同一项目中的多个表。
这似乎可行,但我想知道我可以做些什么来测试它? 另外,我怎样才能让它每天早上自动运行?
import logging
import argparse
import apache_beam as beam
PROJECT='experimental'
BUCKET='temp1/python2'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=test1',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Read the table rows into a PCollection.
rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
query = 'Select * from `table.orders` where paid = false limit 10',
use_standard_sql=True))
# Write the output using a "Write" transform that has side effects.
rows | 'Write' >> beam.io.WriteToBigQuery(
table='orders_test',
dataset='external',
project='experimental',
schema='field1:type1,field2:type2,field3:type3',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
测试——你可以尝试运行一个较小的数据集来测试它。如果您正在运行用户代码(不仅仅是读/写),您可以使用文件中的数据进行测试并检查预期结果。但是由于您只是在进行读/写,因此您需要使用 bigquery 进行测试。
您可以使用 AIRFLOW 安排每天早上跑步。 您只需要 DataFlowPythonOperator 即可执行存储在
.py
文件中的数据流管道。
例如,在脚本中提供您的工作数据流管道
my_dataflow_pipe.py
:
import argparse
import apache_beam as beam
def run():
parser = argparse.ArgumentParser(description='Pipeline BGQ2BGQ')
parser.add_argument('--job_name', required=True, type=str)
parser.add_argument('--query', required=True, type=str)
parser.add_argument('--project', required=True, type=str)
parser.add_argument('--region', required=True, type=str)
parser.add_argument('--dataset', required=True, type=str)
parser.add_argument('--table', required=True, type=str)
parser.add_argument('--network', required=True, type=str)
parser.add_argument('--subnetwork', required=True, type=str)
parser.add_argument('--machine_type', required=True, type=str)
parser.add_argument('--max_num_workers', required=True, type=int)
parser.add_argument('--num_workers', required=True, type=int)
parser.add_argument('--temp_location', required=True, type=str)
parser.add_argument('--runner', required=True, type=str)
parser.add_argument('--labels', required=True, type=str)
opts = parser.parse_args()
query = opts.query.replace("\n", " ")
argv = [
f"--job_name={opts.job_name}",
f"--project={opts.project}", f"--region={opts.region}",
f"--network={opts.network}", f"--subnetwork={opts.subnetwork}",
f"--num_workers={opts.num_workers}", f"--max_num_workers={opts.max_num_workers}",
f"--runner={opts.runner}", f"--temp_location={opts.temp_location}",
f"--machine_type={opts.machine_type}", f"--labels={opts.labels}",
]
with beam.Pipeline(argv=argv) as p:
rows = p | 'read' >> beam.io.Read(
beam.io.ReadFromBigQuery(query=query, use_standard_sql=True)
)
rows | 'Write' >> beam.io.WriteToBigQuery(
table=f'{opts.project}:{opts.dataset}.{opts.table}',
schema='field1:type1,field2:type2,field3:type3',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
if __name__ == '__main__':
run()
您可以构建您的 AIRFLOW dag 来触发数据流管道的执行:
import datetime
from airflow import models
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
template_searchpath="/home/airflow/gcs/data/"
with models.DAG(
'MYDAGNAME',
catchup=False,
default_args=default_args,
template_searchpath=template_searchpath,
start_date=datetime.datetime.now() - datetime.timedelta(days=3),
schedule_interval='0 4 * * *', # every day at 04:00 AM UTC
) as dag:
job_name = f"MYJOB-{datetime.datetime.now().strftime('%Y%m%d%H%M')}"
query = "SELECT field1, field2, field3 FROM MYPROJECT.XXX.xxx"
dataflow_pyjob = DataFlowPythonOperator(
task_id="dataflow_pyjob",
job_name=job_name,
py_file=template_searchpath+"my_dataflow_pipe.py",
gcp_conn_id='MY_GCP_CONN_ID',
options={
'job_name':job_name, 'query':query,
'project':'MYPROJECT', 'region':'MYREGION',
'dataset':'MYDATASET', 'table':'MYTAB',
'network':'MYNET', 'subnetwork':'MYSUBNET',
'machine_type':'MYMACHTYPE',
'max_num_workers':'MYMNW', 'num_workers':'MYNW',
'runner':'DataflowRunner', 'tempLocation':'MYTMPLOC',
},
wait_until_finished=True,
)
其中
options
参数包含所有需要并提供给 my_dataflow_pipe.py
的参数(labels 由气流自动填充)。
我没有足够的声誉发表评论,但已接受答案的链接已损坏。更新后的网址是: