我正在研究一个用例,我正在从 PubSub 读取数据,并且想编写聚合数据 值到 bigquery。
这是我正在写给该主题的 PubSub 输入:
b"('B', 'Stream1', 77)"
b"('C', 'Stream3', 11)"
这是我的代码:
import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform
table_spec1 = bigquery.TableReference(
projectId='phonic-vortex-417406',
datasetId='training',
tableId='dflow_agg')
SCHEMA = {
"fields": [
{
"name": 'name',
"type": "STRING",
},
{
"name": 'stream',
"type": "STRING"
},
{
"name": 'salary',
"type": "INT64",
"mode": "NULLABLE"
}
]
}
pipeline_options = PipelineOptions( streaming=True)
class ProcessWords(beam.DoFn):
def process(self, ele):
yield eval(ele)
def run():
with beam.Pipeline(options=pipeline_options) as p:
out= (
p
| "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJECT_ID>/subscriptions/Test-sub")
| "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
|"Formatting " >> beam.ParDo(ProcessWords()) #.with_output_types(CommonLog)
| "Create beam Row" >> beam.Map(lambda x: beam.Row(Name=str(x[0]),Stream=str(x[1]),Salary=int(x[2])))
|"window" >> beam.WindowInto(beam.window.FixedWindows(30))
| SqlTransform(
"""
SELECT
Name,
AVG(Salary) AS avg_sal
FROM PCOLLECTION
GROUP BY Name
""")
# SqlTransform yields python objects with attributes corresponding to
# the outputs of the query.
# Collect those attributes, as well as window information, into a dict
| "Assemble Dictionary" >> beam.Map(
lambda row,
window=beam.DoFn.WindowParam: {
"Name": row.Name,
"avg_sal": row.avg_sal,
"window_start": window.start.to_rfc3339(),
"window_end": window.end.to_rfc3339()
})
# | "Write to BigQuery" >> beam.io.WriteToBigQuery(
# output_table,
# schema=SCHEMA,
# create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
# )
# |beam.MapTuple(lambda N,S,M : {"Name":N,"Stream":S,"Marks":M})
| beam.Map(print)
)
p.run()
run()
我正在使用以下命令运行此脚本”
python3 stream_dflow.py --runner=DataflowRunner --project="<PROJECT_ID>" --region="us-east1" --temp_location="gs://datafllow-stg/staging" --save_main_session=True
但是,我收到此错误。
NameError: name 'beam' is not defined [while running 'Create beam Row-ptransform-36']
但是,beam 已经定义了。
有谁知道是什么问题吗?
我已经尝试了以上所有方法,但仍然出现错误
对于管道选项,
--save_main_session
是一个布尔值,指定--save_main_session
就足够了。你不需要传递 true(--save_main_session=True
)。
让我们构建代码来调用您的 run 方法
if __name__ == '__main__':
run()
因为beam python pickles main并且不知何故没有在
main下运行你的函数
run()
导致了问题?