Beam 作业在本地计算机上成功运行,但在数据流运行器上失败

问题描述 投票:0回答:1

我构建了一个梁作业,其中:

  1. 从 pubsub
     读取数据(像
    {"user_id":"u1", "event_name":"logout", "region":"US"}
  2. 这样的消息)
  3. 从单独的 Bigquery 表并行检索数据,
  4. 连接多条数据
  5. 写回 Bigquery

这是代码

import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms import window
from google.cloud import bigquery
import os
import logging,json
from typing import Tuple,Iterable,Dict
from apache_beam.io.gcp.bigquery_tools import RetryStrategy


def run(argv=None,save_main_session=True):
    parser=argparse.ArgumentParser()
    parser.add_argument('--outputTable',
                       dest='outputTable',
                       required=True)
    parser.add_argument('--stagingLocation',
                       dest='stagingLocation',
                       required=True)
    parser.add_argument('--tempLocation',
                       dest='tempLocation',
                       required=True)
    parser.add_argument('--runner',
                       dest='runner',
                       required=True)

    group=parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--inputTopic',
                       dest='inputTopic')
    group.add_argument('--inputSub',
                       dest='inputSub')

    known_args,pipeline_args=parser.parse_known_args(argv)
    pipeline_options=PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session=save_main_session
    pipeline_options.view_as(StandardOptions).streaming=True


    p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
    if known_args.inputSub:
       message=(
            p|beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
    else:
       message=(
           p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))

    def extract_element_Fn(element)->Tuple[str,Dict]:
        try:
            print("extractElement Start")
            data = element.data.decode('utf-8')
            if json.loads(data).get('event_name') == 'logout':
                user_id = json.loads(data).get('user_id')
                # raise Exception("extract fail")
                """https://stackoverflow.com/questions/53912918/difference-between-beam-pardo-and-beam-map-in-the-output-type
                Best practice: output list in ParDo(),single object in Map
                """
                return (user_id, data)
        except Exception as err:
            step_name = 'extractElement'
            failure=(step_name,user_id)
            return beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)

    mainData,extract_failure=(
        message
        |'filter logout event'>>beam.Map(extract_element_Fn).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    windowData=(
        mainData
        |'window' >> beam.WindowInto(window.FixedWindows(5,0))
        |'group by key' >> beam.GroupByKey()
    )

    def enrich_country_Fn(element)->Tuple[str,str]:
        try:
            print("Enrich Country Start")
            user_id=element[0]
            # raise Exception("enrich country fail")
            query = 'select country from `agolis-allen-first.dataflow_bole.country_dim` where user_id="{}"' \
               .format(user_id)
            client=bigquery.Client()
            query_job = client.query(query)
            result=query_job.result()

            status=None
            country=None
            len_result = 0
            for row in result:
                country=row.country
                len_result+=1

            if len_result == 0:
                status=OUTPUT_TAG_NO_REC
                return (user_id,None)
            else:
                status = OUTPUT_TAG_COMPLETE

            return (user_id,country)
        except Exception as err:
            step_name = 'enrich_country'
            failure = (step_name,user_id)
            return beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)

    enrichCountry,country_failure = (
        windowData
        |'enrich country via ParDo' >> beam.Map(enrich_country_Fn).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    def enrich_history_Fn(element)->Tuple[str,Dict]:
        try:
            print("Enrich History Start")
            user_id=element[0]
            # raise Exception("enrich history fail")
            query = 'select event_date,event_name,device from `agolis-allen-first.dataflow_bole.event_history` where user_id="{}"' \
               .format(user_id)
            client=bigquery.Client()
            query_job = client.query(query)
            result=query_job.result()

            status=None
            event_params=[]

            len_result = 0
            for row in result:
                single_event_params={}
                single_event_params['event_date']=row.event_date
                single_event_params['event_name'] = row.event_name
                single_event_params['device'] = row.device
                event_params.append(single_event_params)
                len_result+=1

            if len_result == 0:
                status=OUTPUT_TAG_NO_REC
                return(user_id,None)
            else:
                status = OUTPUT_TAG_COMPLETE

            return (user_id,event_params)
        except Exception as err:
            step_name = 'enrich_history'
            failure = (step_name,user_id)
            return beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)

    enrichHistory,history_failure = (
        windowData
        |'enrich history' >> beam.Map(enrich_history_Fn).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    def merge_data(element):
        print("Merge Data Start")
        result_json={}
        result_json["user_id"]=element[0]
        result_json["country"]=element[1][0][0]
        result_json["events"]=element[1][1][0]
        return result_json

    processedData = (
        (enrichCountry,enrichHistory)
        |beam.CoGroupByKey()
        |'combine data' >> beam.Map(merge_data)
        |'write complete data to bq' >> beam.io.WriteToBigQuery(
        table='agolis-allen-first:dataflow_bole.result',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
        )
    )

    def parse_failure(element):
        print("parse_failure")
        result_json={}
        result_json["step_name"]=element[0]
        result_json["user_id"]=element[1]
        return result_json

    failed_data=(
        (extract_failure,country_failure,history_failure)
        |"flattern" >> beam.Flatten()
        |"format failure data" >> beam.Map(parse_failure)
        | 'write failure data to bq' >> beam.io.WriteToBigQuery(
        table='agolis-allen-first:dataflow_bole.result_err',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
        )
    )

    p.run().wait_until_finish()

if __name__ == '__main__':
    path_to_credential = '/Users/wangez/Downloads/GCP_Credentials/agolis-allen-first-13f3be86c3d1.json'
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_credential
    logging.getLogger().setLevel(logging.INFO)

    OUTPUT_TAG_NO_REC = 'Norecord'
    OUTPUT_TAG_COMPLETE = 'complete'
    OUTPUT_TAG_FAILURE = 'failure'

    run()

此代码可以在我的本地计算机上成功运行,但是,当我尝试在数据流上运行代码时,出现以下错误

Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 297, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 372, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1051, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 568, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 261, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1533, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 208, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 265, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1495, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1506, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1055, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 209, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 717, in apache_beam.coders.coder_impl.MapCoderImpl.estimate_size AttributeError: 'str' object has no attribute 'items' [while running 'filter logout event/Map(extract_element_Fn)-ptransform-76']
这是创建数据流管道的命令

python -m dataflow_bole_logout_event_complex_Map --outputTable agolis-allen-first:experiment.dataflow_insert --region us-central1 --stagingLocation gs://agolis-allen-first-dataflow/staging --tempLocation gs://agolis-allen-first-dataflow/temp --temp_location gs://agolis-allen-first-dataflow/temp --staging_location gs://agolis-allen-first-dataflow/staging --inputTopic projects/agolis-allen-first/topics/demo --runner DataflowRunner --project agolis-allen-first --network=first-vpc
您能建议如何处理这个问题吗?预先感谢。

google-cloud-dataflow apache-beam
1个回答
0
投票
这是一个解码问题。当涉及步骤之间的解码/编码时,

DirectRunner

相当宽松。相比之下,
Dataflow
真的很严格。我相信事实上,
DirectRunner
实际上并没有在步骤之间进行解码/编码,而
Dataflow
却可以。

无论如何,您将在以下

extract_element_Fn

内返回(如果出现异常):

failure=(step_name,user_id) return beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
但是,

user_id

是字符串类型。您告诉 
Dataflow
 您的输出类型是 
Tuple[str,Dict]
。因此,在解码/编码过程中,它试图循环遍历字典的项目。然而,由于 
user_id
 是一个字符串,因此它不具有属性 
items

解决方案: 要么跳过类型提示(因为在梁上下文中它们不仅仅是类型提示,请参见

here),或者确保您的标记输出实际上在输出元组中返回一个字典。

© www.soinside.com 2019 - 2024. All rights reserved.