我有一个情况想知道是否可以应对。我目前有一个 BigQueryUpsertTableOperator,它会转到 Google 云存储并从其中的所有 csv 文件创建一个外部表。所有这些文件共享完全相同的 18 列,它们只是不同的数据,每天都会出现一个新文件。最近,较新的文件添加了额外的第 19 列。是否仍然可以允许将所有这些文件创建到同一个外部表中?有什么方法可以让这些值在新文件中为空吗?
create_external_table = BigQueryUpsertTableOperator(
task_id=f"create_external_{TABLE}_table",
dataset_id=DATASET,
project_id=INGESTION_PROJECT_ID,
table_resource={
"tableReference": {"tableId": f"{TABLE}_external"},
"externalDataConfiguration": {
"sourceFormat": "CSV",
"allow_quoted_newlines": True,
"allow_jagged_rows":True,
"autodetect": True,
"sourceUris": [f"gs://{ARCHIVE_BUCKET}/{DATASET}_data/*.csv"],
},
"labels": labeler.get_labels_bigquery_table_v2(
target_project=INGESTION_PROJECT_ID,
target_dataset=DATASET,
target_table=f"{TABLE}_external",
),
},
)
我是否需要手动进入这些旧文件并在每个记录末尾添加逗号,并在第一行中添加新列名称,以便将这些文件与新文件并排摄取,或者以任何方式设置选项这个任务?
我已经重复了您的担忧,看来您必须编辑以前的数据才能适合您收到的新文件。
使用新列插入的 csv 进行查询时出现错误:
请注意,您无法直接通过存储桶中的 GCS 文件进行编辑:
对象是由任何格式的文件组成的不可变数据
awk
命令或在 python 中使用 dataframes 在 csv 上添加新列,我相信还有很多其他方法。