如何使用 Airflow 传输数据

问题描述 投票:0回答:1

我有一个非常大的 csv 文件。我想“逐一”或“批量”流式传输数据行,并使用 AirFlow 将其放入 (WRITE_APPEND) BigQuery 中。我怎样才能实现这个目标?

python google-cloud-platform google-bigquery airflow streaming
1个回答
0
投票

我不确定您的用例是否包含所有元素,但我向您建议一个解决方案。

从您的

Airflow
DAG,您可以按以下方式对任务进行排序:

  • 任务 1:通过
    CSV
    运算符将
    BigQuery
    文件加载到
    GCSToBigQuery
    临时表
  • 任务 2:添加包含您的
    PythonOperator
    调用的
    API
    并将结果存储在另一个
    BigQuery
    暂存表中
  • 任务 3:使用
    BigQueryInsertJobOperator
    进行查询。此查询将连接第一个和第二个临时表之间的信息(第二个是
    API
    调用的结果),应用一些字段重命名和最小转换。将结果加载到最终的
    BigQuery
    表中。
© www.soinside.com 2019 - 2024. All rights reserved.