我使用 Apache-beam 编写了一个小型管道。它使用 beam-postgres 作为输入连接器从数据库表创建 PCollection。 代码如下所示-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from psycopg.rows import dict_row
from beam_postgres.io import ReadAllFromPostgres
def __trigger_bill_fetch_job(self):
print("Triggering bill-fetch job")
pipeline = beam.Pipeline()
read_from_db = ReadAllFromPostgres(
"host={host} dbname={dbName} user={user} password={password}",
"SELECT * FROM comparison_bill_data_requests WHERE status='PENDING' AND bill_event_received=true and bill_detail_event_received=true",
dict_row,
)
result = pipeline | "ReadPendingRecordsFromDB" >> read_from_db | "Print result" >> beam.Map(print)
pipeline.run().wait_until_finish()
print("read_from_db done", read_from_db)
如果
beam.Map(print)
是,则输出
{'id': '1', 'bill_id': 'bill-1', 'account_id': None, 'bill_event_received': True, 'bill_detail_event_received': True, 'status': 'PENDING', 'commodity_type': None, 'bill_start_date': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'bill_end_date': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'tenant_id': 'tenant-1', 'created_at': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'updated_at': datetime.datetime(2023, 12, 6, 11, 53, 21, 300224)}
如何将其解析为对象?我想将其传递到管道中的下一步并希望访问它的属性。我该怎么办?
PCollection<Row> rows = pipeline
.apply(JdbcIO.<Row>read()
.withDataSourceConfiguration(DataSourceConfiguration.create(...)
.withUrl("jdbc:database-url")
.withUsername("username")
.withPassword("password"))
.withQuery("SELECT * FROM my_table")
.withRowMapper(new JdbcIO.RowMapper<Row>() {
public Row mapRow(ResultSet resultSet) throws Exception {
// Implement the mapping logic
}
}));
// 现在您可以在管道的后续步骤中使用 PCollection“行”
根据
print
的结果,beam_postgres
包在来自 PCollection
的输入连接器中返回 Dict
的 Postgres
。
您可以调用
beam.Map
中的函数并访问Dict
中当前PCollection
的字段:
from typing import Dict
import apache_beam as beam
from beam_postgres.io import ReadAllFromPostgres
from psycopg.rows import dict_row
def __trigger_bill_fetch_job(self):
print("Triggering bill-fetch job")
pipeline = beam.Pipeline()
read_from_db = ReadAllFromPostgres(
"host={host} dbname={dbName} user={user} password={password}",
"SELECT * FROM comparison_bill_data_requests WHERE status='PENDING' AND bill_event_received=true and bill_detail_event_received=true",
dict_row,
)
result = pipeline | "ReadPendingRecordsFromDB" >> read_from_db | "Print result" >> beam.Map(to_fields)
pipeline.run().wait_until_finish()
print("read_from_db done", read_from_db)
def to_fields(row: Dict) -> Dict:
field1 = row["field1"]
field2 = row["field2"]
return row
在此示例中,我从
to_fields
运算符调用 beam.Map
方法。
在此方法中,我们可以访问 PCollection
中的当前元素(即 Dict
),获取所需的字段并在结果中应用转换。