数据流apache beam中的问题读取功能

问题描述 投票:0回答:1

我正在 Google Dataflow 上使用 Apache Beam,并且正在调用 3 个函数

| "Unnest 1" >> beam.Map(lambda record: dict_level1(record))
| "Unnest 2" >> beam.Map(lambda record: unnest_dict(record))
| "Unnest 3" >> beam.Map(lambda record: dict_level0(record))

但是当我在数据流中运行作业时,出现名称未定义的错误。

这是我的代码

import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions

#este me crea el output y me genera el template
pipeline_options = {
    'project': 'c3t-tango-dev',
    'runner': 'DataflowRunner',
    'region': 'us-central1',  # Asegúrate de especificar la región correctamente
    'staging_location': 'gs://dario-dev-gcs/dataflow-course/staging',
    'template_location': 'gs://dario-dev-gcs/dataflow-course/templates/batch_job_df_gcs_flights4'
}


pipeline_options = PipelineOptions.from_dictionary(pipeline_options)

table_schema = 'airport:STRING, list_delayed_num:INTEGER, list_delayed_time:INTEGER'
table = 'c3t-tango-dev:dataflow.flights_aggr'

class Filter(beam.DoFn):
    def process(self, record):
        if int(record[8]) > 0:
            return [record]

def dict_level1(record):
    dict_ = {}
    dict_['airport'] = record[0]
    dict_['list'] = record[1]
    return (dict_)

def unnest_dict(record):
    def expand(key, value):
        if isinstance(value, dict):
            return [(key + '_' + k, v) for k, v in unnest_dict(value).items()]
        else:
            return [(key, value)]

    items = [item for k, v in record.items() for item in expand(k, v)]
    return dict(items)

def dict_level0(record):
    #print("Record in dict_level0:", record)
    dict_ = {}
    dict_['airport'] = record['airport']
    dict_['list_Delayed_num'] = record['list_Delayed_num'][0]
    dict_['list_Delayed_time'] = record['list_Delayed_time'][0]
    return (dict_)

with beam.Pipeline(options=pipeline_options) as p1:
    serviceAccount = "./composer/dags/c3t-tango-dev-591728f351ee.json"
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = serviceAccount

    Delayed_time = (
        p1
        | "Import Data time" >> beam.io.ReadFromText("gs://dario-dev-gcs/dataflow-course/input/voos_sample.csv",
                                                     skip_header_lines=1)
        | "Split by comma time" >> beam.Map(lambda record: record.split(','))
        | "Filter Delays time" >> beam.ParDo(Filter())
        | "Create a key-value time" >> beam.Map(lambda record: (record[4], int(record[8])))
        | "Sum by key time" >> beam.CombinePerKey(sum)
    )

    Delayed_num = (
        p1
        | "Import Data" >> beam.io.ReadFromText("gs://dario-dev-gcs/dataflow-course/input/voos_sample.csv",
                                                 skip_header_lines=1)
        | "Split by comma" >> beam.Map(lambda record: record.split(','))
        | "Filter Delays" >> beam.ParDo(Filter())
        | "Create a key-value" >> beam.Map(lambda record: (record[4], int(record[8])))
        | "Count by key" >> beam.combiners.Count.PerKey()
    )

    Delay_table = (
      {'Delayed_num': Delayed_num, 'Delayed_time': Delayed_time}
      | "Group By" >> beam.CoGroupByKey()
      | "Unnest 1" >> beam.Map(lambda record: dict_level1(record))
      | "Unnest 2" >> beam.Map(lambda record: unnest_dict(record))
      | "Unnest 3" >> beam.Map(lambda record: dict_level0(record))
      #| beam.Map(print)
      | "Write to BQ" >> beam.io.WriteToBigQuery(
        table,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        custom_gcs_temp_location="gs://dario-dev-gcs/dataflow-course/staging")
    )

p1.run()

我运行此代码,在 gcs 中生成一个模板,然后我使用自定义模板将模板上传到数据流并指向该模板,但在运行时出现此错误

文件“/Users/dario/Repo-c3tech/c3t-tango/./composer/dags/gcp_to_bq_table.py”,第 76 行,位于 NameError:名称“dict_level1”未定义

python google-cloud-dataflow beam
1个回答
0
投票

设置

--save_main_session pipeline option to  True
后上述错误已解决。

错误:

  File "/Users/dario/Repo-c3tech/c3t-tango/./composer/dags/gcp_to_bq_table.py", line 76, in NameError: name 'dict_level1' is not defined

根据此文档,当您在本地执行时,例如使用DirectRunner执行时,不会发生此错误。如果您的 DoFns 使用全局命名空间中的值,而这些值在数据流工作线程上不可用,则会出现此错误。要解决此问题,请将 --save_main_session 管道选项设置为 True

© www.soinside.com 2019 - 2024. All rights reserved.