错误“PDone”对象没有属性“窗口”数据流 WriteToPubSub

问题描述 投票:0回答:0

我有一个梁管道,它从两个 Postgres CloudSQL DB 读取记录,进行一些数据转换,并通过

WriteToPubSub
模块将数据推送到 Google PubSub。 \

我能够在本地作为 DirectRunner 运行此管道,其中与 CloudSQL 的连接以及发布到 PubSub 都工作正常。

但是当我将

runner='DataflowRunner'
作为管道选项包含时,管道失败并出现错误, “PDone”对象没有属性“windowing”。

此错误发生在beam中

get_windowing
模块内的
ptransform.py
函数中。

我不确定在 Dataflow 运行器中运行代码会带来什么差异。

这是因为某些权限问题导致 Dataflow 无法与 PubSub 或 CloudSQL 通信,还是即使在写入 PubSub 时我是否也需要指定一些特定的窗口选项?

以下代码片段显示了主要的管道流程。

# Invoker code
if __name__ == '__main__':
    # Set up your PostgreSQL connection parameters
    db_config = {
        'host': os.getenv('DB_HOST'),
        'port': os.getenv('DB_PORT'),
        'database': os.getenv('DB_NAME'),
        'user': os.getenv('DB_USER'),
        'password': os.getenv('DB_PASS')
    }
    parser = argparse.ArgumentParser()
    args, beam_args = parser.parse_known_args()
    print(args)
    publish_topic = os.getenv('PUBLISH_TOPIC')

    # Set up Apache Beam pipeline options
    pipeline_options = PipelineOptions(
        beam_args,
        runner='DataflowRunner',
        project='<gcp-project-id>',
        job_name='dispatch-demo-1',
        temp_location='<bucket path>',
        region='europe-west1')

    dispatch_args = pipeline_options.view_as(DispatchOptions)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Create a dummy input element
        dummy_input = pipeline | beam.Create(['dummy'])

        client_operations_query = f"SELECT * FROM table2 WHERE attr1=abc"
        get_retailer_categories_query = 'SELECT * FROM table1'

        co_rows = dummy_input | 'Get table1 rows' >> ReadDB(
            client_operations_query, **db_config) | 'CO list to map' >> beam.Map(list_to_dict, 'internal_category_id')

        co_rows | "co_rows " >> beam.Map(print)

        retailer_cat_rows = dummy_input | 'Get table2 rows' >> ReadDB(
            get_retailer_categories_query, **db_config) | 'Table2 list to map' >> beam.Map(list_to_dict, 'internal_category_id')

        retailer_cat_rows | "retailer_cat_rows" >> beam.Map(print)

        denormalised_co_rows = (({
            'co_rows': co_rows, 'retailer_cat_rows': retailer_cat_rows
        })
            | 'group by cat_ids' >> beam.CoGroupByKey()
            | 'Join by cat_id' >> beam.ParDo(MergeTransform()))

        groupedRows = denormalised_co_rows | beam.GroupBy(get_hash) | 'ExtractClientIds' >> beam.Map(lambda element: (element[0], [obj['client_id'] for obj in element[1]], element[1])) | "Convert to string" >> beam.Map(
            encode_as_task) | "Write to Pub/Sub" >> beam.io.WriteToPubSub(topic=publish_topic) | "pubsub out print" >> beam.Map(print)

google-cloud-dataflow apache-beam google-cloud-pubsub
© www.soinside.com 2019 - 2024. All rights reserved.