我正在运行一个流管道,我尝试在 BigQuery 中写入,然后从中读取。在阅读之前,有没有办法确保我刚刚写的内容存在? 我正在使用Python:
write_result_bigquery = (
read_pubsub
| f'Deserialize {TABLE_NAME.capitalize()} JSON' >> beam.Map(json.loads)
| 'Add Timestamp Field' >> beam.ParDo(AddTimestamp())
| 'Log writing' >> beam.ParDo(LogResult())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
f'{PROJECT_ID}:{DATASET}.{TABLE_NAME}',
schema = table_schema,
custom_gcs_temp_location = f'gs://{GCS_BUCKET}/tmpWrite/tmp',
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
insert_retry_strategy = RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)
)
_ = write_result_bigquery
#2. READ DATA > LAST PROCESSED TIMESTAMP
read_bigquery_timestamp = (
read_pubsub
| 'Get data' >> beam.Map(read_timestamp_file, GCS_BUCKET, source, timestamp_key_cons_mobile)
| 'Get timestamp' >> beam.Map(get_timestamp, key = timestamp_key_cons_mobile)
| 'Create query input' >> beam.Map(generate_query_timestamp, query_ts, PROJECT_ID, DATASET, [TABLE_NAME], [id_col])
| 'Read BigQuery by timestamp' >> beam.ParDo(ReadFromBigQueryRetryDoFn(), schema = schema_output)
目前,我实现了一个自定义方法,当查询给出空结果时,该方法会引发异常,但它效率不高,而且我遇到了其他问题。它只是强制数据流流重试,直到他找到东西。
有什么想法吗?
对于流式管道,Apache Beam 在其内置 I/O 中使用的默认 API 是旧式流式 API,该 API 最多会在 2 分钟内禁止访问新写入的数据。
为了能够更立即地查询数据,应使用新的存储写入 API,如下所示:
| "Write to BigQuery"
>> beam.io.WriteToBigQuery(
f"{PROJECT_ID}:{DATASET}.{TABLE_NAME}",
schema=table_schema,
custom_gcs_temp_location=f"gs://{GCS_BUCKET}/tmpWrite/tmp",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API
)
Storage Write API 的作用是缓冲新写入的行并迭代地将它们分批提交到表中。这使您几乎可以立即从缓冲区访问新数据。这是从 DataFlow 写入 BigQuery 的推荐方法,应该适合您。