我有一个非常大的 csv 文件。我想“逐一”或“批量”流式传输数据行,并使用 AirFlow 将其放入 (WRITE_APPEND) BigQuery 中。我怎样才能实现这个目标?
我不确定您的用例是否包含所有元素,但我向您建议一个解决方案。
从您的
Airflow
DAG,您可以按以下方式对任务进行排序:
CSV
运算符将 BigQuery
文件加载到 GCSToBigQuery
临时表PythonOperator
调用的 API
并将结果存储在另一个 BigQuery
暂存表中BigQueryInsertJobOperator
进行查询。此查询将连接第一个和第二个临时表之间的信息(第二个是 API
调用的结果),应用一些字段重命名和最小转换。将结果加载到最终的 BigQuery
表中。