名称错误:运行“创建梁行-ptransform”时未定义名称“梁”

问题描述 投票:0回答:1

我正在研究一个用例,我正在从 PubSub 读取数据,并且想编写聚合数据 值到 bigquery。

这是我正在写给该主题的 PubSub 输入:

b"('B', 'Stream1', 77)"
b"('C', 'Stream3', 11)"

这是我的代码:

import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform

table_spec1 = bigquery.TableReference(
    projectId='phonic-vortex-417406',
    datasetId='training',
    tableId='dflow_agg')


SCHEMA = {
        "fields": [
            {
                "name": 'name',
                "type": "STRING",
                
            },
            {
                "name": 'stream',
                "type": "STRING"
            },
            {
                "name": 'salary',
                "type": "INT64",
                "mode": "NULLABLE"
            }
        ]
    }

pipeline_options = PipelineOptions( streaming=True)

class ProcessWords(beam.DoFn):
  def process(self, ele):
    yield eval(ele)





def run():
    with beam.Pipeline(options=pipeline_options) as p:


        out= (
            p
            | "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
            | "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
            |"Formatting " >> beam.ParDo(ProcessWords()) #.with_output_types(CommonLog)
            | "Create beam Row" >> beam.Map(lambda x: beam.Row(Name=str(x[0]),Stream=str(x[1]),Salary=int(x[2])))
            
            |"window" >> beam.WindowInto(beam.window.FixedWindows(30))  
            | SqlTransform(
                """
                    SELECT
                    Name,
                    AVG(Salary) AS avg_sal
                    FROM PCOLLECTION
                    GROUP BY Name
                    
                """)
        # SqlTransform yields python objects with attributes corresponding to
        # the outputs of the query.
        # Collect those attributes, as well as window information, into a dict
        | "Assemble Dictionary" >> beam.Map(
        lambda row,
        window=beam.DoFn.WindowParam: {
            "Name": row.Name,
            "avg_sal": row.avg_sal,
            "window_start": window.start.to_rfc3339(),
            "window_end": window.end.to_rfc3339()
        })



            # | "Write to BigQuery" >> beam.io.WriteToBigQuery(
            #     output_table,
            #     schema=SCHEMA,
            #     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            #     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            # )
            # |beam.MapTuple(lambda N,S,M : {"Name":N,"Stream":S,"Marks":M})
            | beam.Map(print)
        )
    

    p.run()

run()

我正在使用以下命令运行此脚本”

python3 stream_dflow.py --runner=DataflowRunner --project="<PROJECT_ID>" --region="us-east1" --temp_location="gs://datafllow-stg/staging" --save_main_session=True

但是,我收到此错误。

NameError: name 'beam' is not defined [while running 'Create beam Row-ptransform-36']

但是,beam 已经定义了。

有谁知道是什么问题吗?

我已经尝试了以上所有方法,但仍然出现错误

google-cloud-platform google-cloud-dataflow apache-beam
1个回答
0
投票

对于管道选项,

--save_main_session
是一个布尔值,指定
--save_main_session
就足够了。你不需要传递 true(
--save_main_session=True
)。

让我们构建代码来调用您的 run 方法

if __name__ == '__main__':
  run()

因为beam python pickles main并且不知何故没有在

main
下运行你的函数run()导致了问题?

© www.soinside.com 2019 - 2024. All rights reserved.