我想构建一个 Beam 程序来
{"user_id":"u1"}
这是我的代码(仅包含对2个表的查询)
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from google.cloud import bigquery
import apache_beam.io.gcp.pubsub as pubsub
import re,os
import logging,json
class extractElement(beam.DoFn):
def process(self, element, *args, **kwargs):
# The input is of pubsubmessage type which is b'{"user_id":"u1"}', need to decode
try:
print("extractElement Start")
data = element.data.decode('utf-8')
yield (data)
except Exception as err:
step_name = 'extractElement'
failure=[step_name,element]
yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE,failure)
class enrich_country(beam.DoFn):
def process(self, element, *args, **kwargs):
# The input is of json type which is '{"user_id":"u1"}', no need for decode
try:
print("Enrich Country Start")
uid=json.loads(element).get('user_id')
query = 'select country from `agolis-allen-first.dataflow_bole.country_dim` where user_id="{}"' \
.format(uid)
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
status=None
country=None
len_result = 0
for row in result:
country=row.country
len_result+=1
if len_result == 0:
status=OUTPUT_TAG_NO_REC
else:
status = OUTPUT_TAG_COMPLETE
yield (uid,country,status)
except Exception as err:
step_name = 'enrich_country'
failure = [(uid,country, step_name)]
yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
class enrich_history(beam.DoFn):
def process(self, element, *args, **kwargs):
# The input is of json type which is '{"user_id":"u1"}', no need for decode
try:
print("Enrich history started")
uid=json.loads(element).get('user_id')
query = 'select event_date,event_name,device from `agolis-allen-first.dataflow_bole.event_history` where user_id="{}"' \
.format(uid)
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
status=None
event_params=[]
len_result = 0
for row in result:
single_event_params={}
single_event_params['event_date']=row.event_date
single_event_params['event_name'] = row.event_name
single_event_params['device'] = row.device
event_params.append(single_event_params)
len_result+=1
if len_result == 0:
status=OUTPUT_TAG_NO_REC
else:
status = OUTPUT_TAG_COMPLETE
yield (uid,event_params,status)
except Exception as err:
step_name = 'enrich_hisotry'
failure = [(uid,event_params, step_name)]
yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
def run(argv=None,save_main_session=True):
parser=argparse.ArgumentParser()
parser.add_argument('--outputTable',
dest='outputTable',
required=True)
parser.add_argument('--stagingLocation',
dest='stagingLocation',
required=True)
parser.add_argument('--tempLocation',
dest='tempLocation',
required=True)
parser.add_argument('--runner',
dest='runner',
required=True)
group=parser.add_mutually_exclusive_group(required=True)
group.add_argument('--inputTopic',
dest='inputTopic')
group.add_argument('--inputSub',
dest='inputSub')
known_args,pipeline_args=parser.parse_known_args(argv)
pipeline_options=PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session=save_main_session
pipeline_options.view_as(StandardOptions).streaming=True
p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
if known_args.inputSub:
message=(
p|beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
else:
message=(
p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
# withoutputs(with_outputs(OUTPUT_TAG_FAILURE,main='outputs'))
# the output without any TAG will be assigned TAG outputs.
mainData,failure_extractElement=(
message |'split'>>beam.ParDo(extractElement()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
enrichCountry,failure_enrich=(
mainData |'enrich country' >> beam.ParDo(enrich_country()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
enrichHistory,failure_enrich=(
mainData |'enrich history' >> beam.ParDo(enrich_history()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
processedData = (
(enrichCountry,enrichHistory)
|'combine record' >> ????
)
if __name__ == '__main__':
path_to_credential = '***.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_credential
logging.getLogger().setLevel(logging.INFO)
OUTPUT_TAG_NO_REC = 'Norecord'
OUTPUT_TAG_COMPLETE = 'complete'
OUTPUT_TAG_FAILURE = 'failure'
run()
“富国”的产出包括
uid, country, status
和
“丰富历史”的输出包括 uid, array of event history
.
我想组合两个pCollection来生成像
uid, country, array of event history
这样的记录。
我尝试在“合并记录”步骤中应用 CoGroupbyKey(),但被告知 “GroupByKey 无法应用于具有全局窗口和默认触发器的无界 PCollection”。
我的问题是:
@digitalearth,正如您在评论中提到的。
使用固定窗口可以克服上述问题。
将答案发布为 社区 wiki,以造福于将来可能遇到此用例的社区。
请随意编辑此答案以获取更多信息。