无法从 Dataflow 将数据插入 BigQuery(使用 Python SDK)

问题描述 投票:0回答:1

当我尝试从数据流写入 BigQuery 时,我试图找出 BigQuery 所期望的正确架构。

我采用了 Apache Beam 流式传输 pubsub-to-pubsub 示例,仅更改了最后一步,该步骤最初流回 pubsub 主题,而是尝试写入 BigQuery 表。

原文(相关部分)

    output = (
        counts
        | 'format' >> beam.Map(format_result)
        | 'encode' >>
        beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes))

    # Write to PubSub.
    # pylint: disable=expression-not-assigned
    output | beam.io.WriteToPubSub(known_args.output_topic)

我切换到了

    output = (
        counts
        | 'format' >> beam.Map(format_result)

        output | beam.io.WriteToBigQuery(
            known_args.output_table,
            method="STREAMING_INSERTS",
            schema='"data":"STRING"',
        )

错误

然后我向正在读取的 pubsub 主题发送一些消息。这些消息经过正确处理(以流式传输方式),以便计算相似单词的计数,并尝试将该计数推送到 BigQuery。然后我在 Dataflow 的日志中收到以下错误:

消息:“插入 BigQuery 时出错。将重试。错误为 [{'index': 0, 'errors': [{'message': 'POST https://bigquery.googleapis.com/bigquery/v2/项目/MY-PROJECT/datasets/MY-DATASET/tables/MY-TABLE/insertAll?prettyPrint=false:“rows[0].json”处的值无效(type.googleapis.com/google.protobuf.Struct),“另一个:5"','原因':'错误请求'}]}]"

我刚刚手动发送了 5 条消息,内容为

"And here's another message."
,因此为什么
"another: 5"
是它尝试发送的消息之一。

其他详情

  • 该表确实存在。它只有一列,简称为“数据”,类型为 STRING。
  • 我还尝试了更完整的模式,其中也包含表信息,如下所示:
    '{"fields":[{"name": "data", "type": "STRING"}]}'
    ,以及一些其他变体。
  • 我尝试更改发送到 BigQuery 的格式,具有以下变化:
        output = (
            counts
            | "format" >> beam.Map(format_result)
            | "encode" >> beam.Map(lambda x: x.encode("utf-8")).with_output_types(bytes)
        )

        output | beam.io.WriteToBigQuery(
            known_args.output_topic,
            method="STREAMING_INSERTS",
            schema='"data":"STRING"',
        )

        output = (
            counts
            | "format" >> beam.Map(format_result)
            | "encode" >> beam.Map(lambda x: x.encode("utf-8"))
        )

        output | beam.io.WriteToBigQuery(
            known_args.output_topic,
            method="STREAMING_INSERTS",
            schema='"data":"STRING"',
        )

我知道我以某种方式发送了错误的格式,但是(a)错误消息对我完全解决问题没有足够的帮助,并且(b)我在使用数据流推送到BigQuery的行上找不到any示例Python SDK 和窗口流。

最让我困惑的错误的两个要素:

  1. 'rows[0].json'
    在我看来,它正在尝试(成功??)调用行上的
    json
    方法,这可能只有在 JavaScript 中以某种方式发生时才有意义。
  2. 它提到了
    type.googleapis.com/google.protobuf.Struct
    ,我没有在Dataflow中创建protobuf,也没有在protobuf中创建BigQuery schema。 (BigQuery 有自己的格式,大部分与 protobuf 兼容,但又不同。)
python google-bigquery streaming google-cloud-dataflow apache-beam
1个回答
0
投票

您可以更改一些内容来确保 WriteToBigQuery 正常工作:

  • 尝试将模式作为字典传递,而不将其转换为字符串。

    table_schema = {
        "fields": [
           {"name": "col_1", "type": "STRING", "mode": "NULLABLE"},
           {"name": "col_2", "type": "STRING", "mode": "NULLABLE"},
           {"name": "col_3", "type": "STRING", "mode": "NULLABLE"}
        ]
    }
    
  • 正确填写所需选项,以缩小调试范围:

    beam.io.WriteToBigQuery(
        project="project_id",
        dataset="dataset_name",
        table="table_name",
        schema=table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,          
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        method="STREAMING_INSERTS",
      )
    
  • 您不能将所有数据从 PubSub 推送到 BigQuery 表而不进行清理并具有适当的架构。

  • 您不能根据文档使用 schema=SCHEMA_AUTODETECT 进行流式传输:

流式插入不支持架构自动检测 大查询。仅适用于文件加载

© www.soinside.com 2019 - 2024. All rights reserved.