在python中bigquery sink后是否可以进行其他处理?

问题描述 投票:0回答:1

我正在编写具有以下过程的管道:

1. Read pubsub messages with attribute 'uid' which is the unique id for this message
2. Store the message in Bigquery, the data format is 
    uid | message data | status
    ------------------------------
      1 | {XXXXX}      | new

3. process the message data 
4. update the message to set the status to 'complete'
    uid | message data | status
    ------------------------------
      1 | {XXXXX}      | complete

我有三个问题:

Q1.bigquery sink后如何继续进一步处理

如果我使用 bigqueryIO 来实现像这样的 step2

p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
message=(
        p
        |beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
        |'format complete data output'>> beam.ParDo(format_result_for_bq())
        |'write complete data to bq' >> beam.io.WriteToBigQuery(
            table='XXXXX',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
        )
        |'further process' >> beam.ParDo(further_processing_fn())
    )

step 'further process' 只能得到 beam.io.WriteToBigQuery 的输出而不是原始消息。

我也考虑使用侧输出,但这很难确保在数据成功写入 bigquery 之前不会开始“进一步处理”。

你能帮忙建议一下如何实现目标吗?

Q2。在ParDo里面做bigquery操作合适吗

鉴于上面的流程逻辑,这是我的代码

class saveData(beam.DoFn):
    def process(self, element, *args, **kwargs):
        client=bigquery.Client()
        query="insert into `XXXXX` values ('{}','{}','{}')"\
                    .format(element[1].get('uid'),element[0],'ongoing')
        query_job = client.query(query)
        result = query_job.result()
        res=(element[0],element[1],'ongoing')
        yield res

class businessProcess(beam.DoFn):
   def process(self, element,*args, **kwargs):
       print("business process logic")
       res=(element[0],element[1],'complete')
       yield res

class updateRow(beam.DoFn):
   def process(self, element,*args, **kwargs):
       attribute = element[1]
       query = 'update `XXXXXX` set status = "complete" where uid="{}" ' \
               .format(attribute.get('uid'))
       client=bigquery.Client()
       query_job = client.query(query)
       result=query_job.result()
......
p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
message=(
        p
        |beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
        |'insert to bq with status ongoing'>> beam.ParDo(saveData())
        |'business process'>> beam.ParDo(businessProcess())
        |'update bq' >> beam.ParDo(udpateRow())
    )

我不认为这是实现要求的典型光束方法,请您帮忙建议处理此类要求的最佳实践是什么?

Q3。如何将多个输入传递给 ParDo?

如果我需要将 4 个输入项传递给 ParDo 函数,如何在 python 中执行? side input 在我的理解中只支持 2 input,对吗?

google-cloud-dataflow apache-beam
1个回答
1
投票

为了解决您的问题,我建议您采用以下模式:

Topic 1 -> Job Dataflow 1 -> Multi Sink -> Write Result to BigQuery
                                        -> Write Result to Pub Sub Topic 2

Topic 2 -> Job Dataflow 2 -> Apply Business Transformations -> Write Result to BigQuery
  • 第一个
    Dataflow
    作业从主题 1
  • 读取数据
  • 作业 1 应用多接收器,一个用于 BigQuery,另一个用于 Pub Sub 主题 2
  • 第二个
    Dataflow
    作业从主题2读取数据
  • 工作 2 在
    ParDo
    Map
  • 中应用您的业务转型
  • 作业2下沉结果为
    BigQuery

使用此模式,您可以实时应用您的用例。 你需要在 2

Dataflow
工作中分离你的逻辑。

我在你的第二个例子中看到一个

update
,通常
BigQuery
在流媒体模式下使用
append
而不是
update
并且
BigQueryIO
不支持更新。

如果您必须在

BigQuery
中处理重复项,则必须考虑最适合您的方法,并且可以在
Dataflow
流作业(
BigQuery
视图 + 批处理作业)之外处理此需求。

© www.soinside.com 2019 - 2024. All rights reserved.