在全局窗口中加入多个pCollection

问题描述 投票:0回答:1

我想构建一个 Beam 程序来

  1. 流式读取 Pub/Sub 消息,就像
    {"user_id":"u1"}
  2. 使用用户 ID 从 7 个以上 BigQuery 表中检索数据。由于性能问题,我需要并行运行这些查询。
  3. 连接 7 个以上查询的结果来构建完整的记录

这是我的代码(仅包含对2个表的查询)

import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from google.cloud import bigquery
import apache_beam.io.gcp.pubsub as pubsub
import re,os
import logging,json

class extractElement(beam.DoFn):
   def process(self, element, *args, **kwargs):
       # The input is of pubsubmessage type which is b'{"user_id":"u1"}', need to decode
       try:
           print("extractElement Start")
           data = element.data.decode('utf-8')
           yield (data)
       except Exception as err:
           step_name = 'extractElement'
           failure=[step_name,element]
           yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE,failure)

class enrich_country(beam.DoFn):

    def process(self, element, *args, **kwargs):
        # The input is of json type which is '{"user_id":"u1"}', no need for decode
        try:
            print("Enrich Country Start")
            uid=json.loads(element).get('user_id')
            query = 'select country from `agolis-allen-first.dataflow_bole.country_dim` where user_id="{}"' \
               .format(uid)
            client=bigquery.Client()
            query_job = client.query(query)
            result=query_job.result()

            status=None
            country=None
            len_result = 0
            for row in result:
                country=row.country
                len_result+=1

            if len_result == 0:
                status=OUTPUT_TAG_NO_REC
            else:
                status = OUTPUT_TAG_COMPLETE

            yield (uid,country,status)
        except Exception as err:
            step_name = 'enrich_country'
            failure = [(uid,country, step_name)]
            yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)

class enrich_history(beam.DoFn):

    def process(self, element, *args, **kwargs):
        # The input is of json type which is '{"user_id":"u1"}', no need for decode
        try:
            print("Enrich history started")
            uid=json.loads(element).get('user_id')
            query = 'select event_date,event_name,device from `agolis-allen-first.dataflow_bole.event_history` where user_id="{}"' \
               .format(uid)
            client=bigquery.Client()
            query_job = client.query(query)
            result=query_job.result()

            status=None
            event_params=[]

            len_result = 0
            for row in result:
                single_event_params={}
                single_event_params['event_date']=row.event_date
                single_event_params['event_name'] = row.event_name
                single_event_params['device'] = row.device
                event_params.append(single_event_params)
                len_result+=1

            if len_result == 0:
                status=OUTPUT_TAG_NO_REC
            else:
                status = OUTPUT_TAG_COMPLETE

            yield (uid,event_params,status)
        except Exception as err:
            step_name = 'enrich_hisotry'
            failure = [(uid,event_params, step_name)]
            yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)

def run(argv=None,save_main_session=True):
    parser=argparse.ArgumentParser()
    parser.add_argument('--outputTable',
                       dest='outputTable',
                       required=True)
    parser.add_argument('--stagingLocation',
                       dest='stagingLocation',
                       required=True)
    parser.add_argument('--tempLocation',
                       dest='tempLocation',
                       required=True)
    parser.add_argument('--runner',
                       dest='runner',
                       required=True)

    group=parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--inputTopic',
                       dest='inputTopic')
    group.add_argument('--inputSub',
                       dest='inputSub')

    known_args,pipeline_args=parser.parse_known_args(argv)
    pipeline_options=PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session=save_main_session
    pipeline_options.view_as(StandardOptions).streaming=True


    p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
    if known_args.inputSub:
       message=(
            p|beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
    else:
       message=(
           p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))

    # withoutputs(with_outputs(OUTPUT_TAG_FAILURE,main='outputs'))
    # the output without any TAG will be assigned TAG outputs.
    mainData,failure_extractElement=(
        message |'split'>>beam.ParDo(extractElement()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    enrichCountry,failure_enrich=(
        mainData |'enrich country' >> beam.ParDo(enrich_country()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    enrichHistory,failure_enrich=(
        mainData |'enrich history' >> beam.ParDo(enrich_history()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    processedData = (
        (enrichCountry,enrichHistory)
        |'combine record' >> ????
    )


if __name__ == '__main__':
    path_to_credential = '***.json'
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_credential
    logging.getLogger().setLevel(logging.INFO)

    OUTPUT_TAG_NO_REC = 'Norecord'
    OUTPUT_TAG_COMPLETE = 'complete'
    OUTPUT_TAG_FAILURE = 'failure'

    run()

“富国”的产出包括

uid, country, status
和 “丰富历史”的输出包括
uid, array of event history
.

我想组合两个pCollection来生成像

uid, country, array of event history
这样的记录。

我尝试在“合并记录”步骤中应用 CoGroupbyKey(),但被告知 “GroupByKey 无法应用于具有全局窗口和默认触发器的无界 PCollection”

我的问题是:

  1. 就我而言,如何在全局窗口中加入多个pCollections?
  2. 是否可以为全局窗口定义触发器?
  3. 我也考虑过侧输入,但似乎“侧输入”不支持多个pCollection,对吗?如果没有的话有没有示例代码可以参考?
google-cloud-dataflow apache-beam
1个回答
0
投票

@digitalearth,正如您在评论中提到的。

使用固定窗口可以克服上述问题。

将答案发布为 社区 wiki,以造福于将来可能遇到此用例的社区。

请随意编辑此答案以获取更多信息。

© www.soinside.com 2019 - 2024. All rights reserved.