创建我设置的表格时
ingress_at DATETIME DEFAULT CURRENT_DATETIME
在 BigQuery 中查看我的架构时,我可以看到它有效:
我的存储桶中有一个 .json 文件,其中包含与表相同的所有列(除了 ingress_at)。我现在尝试将此文件中的数据附加到现有表中,数据加载正常,但我在整个 ingress_at 表中始终以 NULL 值结束。为什么没有按照指定的入口日期时间填充?
我已经尝试过:
在创建表时在架构中设置 CURRENT_DATETIME() 而不是 CURRENT_DATETIME => 没有更改
将 ingress_at 设为必需字段 => 加载作业失败,因为导入文件中没有此类列
配置加载作业时设置自动检测 TRUE => 错误,因为文件没有标题且 ingress_at 列不存在
为加载作业定义架构 => 加载正常,但 ingress_at 列完全填充有 NULL 值
在 Google BigQuery 中,加载作业期间不会应用表架构中指定的默认值。但是,通过 SQL INSERT 语句插入数据时将使用默认值。当您将数据加载到 BigQuery 中时,如果源数据中缺少某列,则无论表架构中指定的默认值如何,都会用 NULL 值填充该列。
解决方法 1:
创建临时表。 将 JSON 数据加载到与原始表具有相同架构但没有 ingress_at 列的临时表中。
使用默认值插入数据:使用 SQL INSERT 语句将数据从临时表插入目标表,并将 ingress_at 设置为 CURRENT_DATETIME()。 这是示例代码:
插入your_target_table(column1,column2,...,ingress_at) 选择列 1、列 2、...、CURRENT_DATETIME() 来自 your_staging_table;
解决方法 2: 如果您有更复杂的 ETL 管道,请考虑使用 Google Cloud Dataflow 在加载数据时转换数据。 Dataflow 允许您处理数据并将 CURRENT_DATETIME() 添加到 ingress_at 字段,然后再将其加载到 BigQuery 中,如下所示:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
class AddIngressAt(beam.DoFn):
def process(self, element):
element['ingress_at'] = datetime.utcnow().isoformat()
yield element
def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadFromGCS' >> beam.io.ReadFromText('gs://your-bucket/your-file.json')
| 'ParseJson' >> beam.Map(json.loads)
| 'AddIngressAt' >> beam.ParDo(AddIngressAt())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'your-project:your_dataset.your_table',
schema='column1:STRING, column2:STRING, ..., ingress_at:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
if __name__ == '__main__':
run()