当我尝试从数据流写入 BigQuery 时,我试图找出 BigQuery 所期望的正确架构。
我采用了 Apache Beam 流式传输 pubsub-to-pubsub 示例,仅更改了最后一步,该步骤最初流回 pubsub 主题,而是尝试写入 BigQuery 表。
output = (
counts
| 'format' >> beam.Map(format_result)
| 'encode' >>
beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes))
# Write to PubSub.
# pylint: disable=expression-not-assigned
output | beam.io.WriteToPubSub(known_args.output_topic)
我切换到了
output = (
counts
| 'format' >> beam.Map(format_result)
output | beam.io.WriteToBigQuery(
known_args.output_table,
method="STREAMING_INSERTS",
schema='"data":"STRING"',
)
然后我向正在读取的 pubsub 主题发送一些消息。这些消息经过正确处理(以流式传输方式),以便计算相似单词的计数,并尝试将该计数推送到 BigQuery。然后我在 Dataflow 的日志中收到以下错误:
消息:“插入 BigQuery 时出错。将重试。错误为 [{'index': 0, 'errors': [{'message': 'POST https://bigquery.googleapis.com/bigquery/v2/项目/MY-PROJECT/datasets/MY-DATASET/tables/MY-TABLE/insertAll?prettyPrint=false:“rows[0].json”处的值无效(type.googleapis.com/google.protobuf.Struct),“另一个:5"','原因':'错误请求'}]}]"
我刚刚手动发送了 5 条消息,内容为
"And here's another message."
,因此为什么 "another: 5"
是它尝试发送的消息之一。
'{"fields":[{"name": "data", "type": "STRING"}]}'
,以及一些其他变体。 output = (
counts
| "format" >> beam.Map(format_result)
| "encode" >> beam.Map(lambda x: x.encode("utf-8")).with_output_types(bytes)
)
output | beam.io.WriteToBigQuery(
known_args.output_topic,
method="STREAMING_INSERTS",
schema='"data":"STRING"',
)
和
output = (
counts
| "format" >> beam.Map(format_result)
| "encode" >> beam.Map(lambda x: x.encode("utf-8"))
)
output | beam.io.WriteToBigQuery(
known_args.output_topic,
method="STREAMING_INSERTS",
schema='"data":"STRING"',
)
我知道我以某种方式发送了错误的格式,但是(a)错误消息对我完全解决问题没有足够的帮助,并且(b)我在使用数据流推送到BigQuery的行上找不到any示例Python SDK 和窗口流。
最让我困惑的错误的两个要素:
'rows[0].json'
在我看来,它正在尝试(成功??)调用行上的 json
方法,这可能只有在 JavaScript 中以某种方式发生时才有意义。type.googleapis.com/google.protobuf.Struct
,我没有在Dataflow中创建protobuf,也没有在protobuf中创建BigQuery schema。 (BigQuery 有自己的格式,大部分与 protobuf 兼容,但又不同。)您可以更改一些内容来确保 WriteToBigQuery 正常工作:
尝试将模式作为字典传递,而不将其转换为字符串。
table_schema = {
"fields": [
{"name": "col_1", "type": "STRING", "mode": "NULLABLE"},
{"name": "col_2", "type": "STRING", "mode": "NULLABLE"},
{"name": "col_3", "type": "STRING", "mode": "NULLABLE"}
]
}
正确填写所需选项,以缩小调试范围:
beam.io.WriteToBigQuery(
project="project_id",
dataset="dataset_name",
table="table_name",
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method="STREAMING_INSERTS",
)
您不能将所有数据从 PubSub 推送到 BigQuery 表而不进行清理并具有适当的架构。
您不能根据文档使用 schema=SCHEMA_AUTODETECT 进行流式传输:
流式插入不支持架构自动检测 大查询。仅适用于文件加载