Apache Beam Streaming 写入/读取 BigQuery

问题描述 投票:0回答:1

我正在运行一个流管道,我尝试在 BigQuery 中写入,然后从中读取。在阅读之前,有没有办法确保我刚刚写的内容存在? 我正在使用Python:

write_result_bigquery = (
            read_pubsub
            | f'Deserialize {TABLE_NAME.capitalize()} JSON' >> beam.Map(json.loads)
            | 'Add Timestamp Field' >>  beam.ParDo(AddTimestamp())
            | 'Log writing' >> beam.ParDo(LogResult())
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                f'{PROJECT_ID}:{DATASET}.{TABLE_NAME}',
                schema = table_schema,
                custom_gcs_temp_location = f'gs://{GCS_BUCKET}/tmpWrite/tmp',
                write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                insert_retry_strategy = RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                )
        )
_ = write_result_bigquery

#2. READ DATA > LAST PROCESSED TIMESTAMP
read_bigquery_timestamp  = (
            read_pubsub
            | 'Get data' >> beam.Map(read_timestamp_file, GCS_BUCKET, source, timestamp_key_cons_mobile)
            | 'Get timestamp' >> beam.Map(get_timestamp, key = timestamp_key_cons_mobile)
            | 'Create query input' >> beam.Map(generate_query_timestamp, query_ts, PROJECT_ID, DATASET, [TABLE_NAME], [id_col])
            | 'Read BigQuery by timestamp' >> beam.ParDo(ReadFromBigQueryRetryDoFn(), schema = schema_output)

目前,我实现了一个自定义方法,当查询给出空结果时,该方法会引发异常,但它效率不高,而且我遇到了其他问题。它只是强制数据流流重试,直到他找到东西。

有什么想法吗?

python google-cloud-platform google-cloud-dataflow apache-beam beam
1个回答
0
投票

对于流式管道,Apache Beam 在其内置 I/O 中使用的默认 API 是旧式流式 API,该 API 最多会在 2 分钟内禁止访问新写入的数据。

为了能够更立即地查询数据,应使用新的存储写入 API,如下所示:

| "Write to BigQuery"
    >> beam.io.WriteToBigQuery(
        f"{PROJECT_ID}:{DATASET}.{TABLE_NAME}",
        schema=table_schema,
        custom_gcs_temp_location=f"gs://{GCS_BUCKET}/tmpWrite/tmp",
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
        method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API
    )

Storage Write API 的作用是缓冲新写入的行并迭代地将它们分批提交到表中。这使您几乎可以立即从缓冲区访问新数据。这是从 DataFlow 写入 BigQuery 的推荐方法,应该适合您。

© www.soinside.com 2019 - 2024. All rights reserved.