如何在管道的下一步中从 DB I/O 连接器访问 PCollection

问题描述 投票:0回答:2

我使用 Apache-beam 编写了一个小型管道。它使用 beam-postgres 作为输入连接器从数据库表创建 PCollection。 代码如下所示-

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from psycopg.rows import dict_row
from beam_postgres.io import ReadAllFromPostgres
def __trigger_bill_fetch_job(self):
        print("Triggering bill-fetch job")            
        pipeline = beam.Pipeline()   
       
        read_from_db = ReadAllFromPostgres(
            "host={host} dbname={dbName} user={user} password={password}",
            "SELECT * FROM comparison_bill_data_requests WHERE status='PENDING' AND bill_event_received=true and bill_detail_event_received=true",
            dict_row,
        )

        result = pipeline | "ReadPendingRecordsFromDB" >> read_from_db | "Print result" >> beam.Map(print)

        pipeline.run().wait_until_finish()          
        print("read_from_db done", read_from_db)

如果

beam.Map(print)

,则输出
{'id': '1', 'bill_id': 'bill-1', 'account_id': None, 'bill_event_received': True, 'bill_detail_event_received': True, 'status': 'PENDING', 'commodity_type': None, 'bill_start_date': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'bill_end_date': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'tenant_id': 'tenant-1', 'created_at': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'updated_at': datetime.datetime(2023, 12, 6, 11, 53, 21, 300224)}

如何将其解析为对象?我想将其传递到管道中的下一步并希望访问它的属性。我该怎么办?

python apache-spark google-cloud-dataflow apache-beam
2个回答
0
投票
PCollection<Row> rows = pipeline
    .apply(JdbcIO.<Row>read()
        .withDataSourceConfiguration(DataSourceConfiguration.create(...)
            .withUrl("jdbc:database-url")
            .withUsername("username")
            .withPassword("password"))
        .withQuery("SELECT * FROM my_table")
        .withRowMapper(new JdbcIO.RowMapper<Row>() {
            public Row mapRow(ResultSet resultSet) throws Exception {
            // Implement the mapping logic
        }
    }));

// 现在您可以在管道的后续步骤中使用 PCollection“行”


0
投票

根据

print
的结果,
beam_postgres
包在来自
PCollection
的输入连接器中返回
Dict
Postgres

您可以调用

beam.Map
中的函数并访问
Dict
中当前
PCollection
的字段:

from typing import Dict

import apache_beam as beam
from beam_postgres.io import ReadAllFromPostgres
from psycopg.rows import dict_row

def __trigger_bill_fetch_job(self):
    print("Triggering bill-fetch job")
    pipeline = beam.Pipeline()

    read_from_db = ReadAllFromPostgres(
        "host={host} dbname={dbName} user={user} password={password}",
        "SELECT * FROM comparison_bill_data_requests WHERE status='PENDING' AND bill_event_received=true and bill_detail_event_received=true",
        dict_row,
    )

    result = pipeline | "ReadPendingRecordsFromDB" >> read_from_db | "Print result" >> beam.Map(to_fields)

    pipeline.run().wait_until_finish()
    print("read_from_db done", read_from_db)


def to_fields(row: Dict) -> Dict:
    field1 = row["field1"]
    field2 = row["field2"]

    return row

在此示例中,我从

to_fields
运算符调用
beam.Map
方法。 在此方法中,我们可以访问
PCollection
中的当前元素(即
Dict
),获取所需的字段并在结果中应用转换。

© www.soinside.com 2019 - 2024. All rights reserved.