Apache Beam Python SDK - 使用 JDBC io 从 Postgres 读取

问题描述 投票:0回答:1

我正在寻找有关如何使用 Beam Python SDK 从 Postgres 读取数据/向 Postgres 写入数据的资源。到目前为止,我了解到 apache_beam.io.jdbc 是我们最好的选择(如果有更好的选择,请告诉我)。

我尝试使用它,它能够处理原始数据类型,例如整数和字符串。但是,它无法处理 Postgres 中的 LogicalTypes,例如“timestamp without time zone”类型。

这是我的小实验的一些细节。感谢任何帮助!

Python v3.11.4
apache-beam v2.51.0(Python SDK)
postgres v11.5
DirectRunner

这是管道代码:

with beam.Pipeline(options=None) as p:
    pipeline = (
        p
        | ReadFromJdbc(
            table_name="table_name",
            driver_class_name='org.postgresql.Driver',
            jdbc_url='jdbc:{}://{}:{}/{}'.format("postgresql", "127.0.0.1", "5432", "db_name"),
            username="postgres",
            password="redacted",
            query="SELECT * FROM table_name")
        | beam.Map(print)
    )

在尝试解析“没有时区的时间戳”列时,会遇到以下错误。我的理解是 LogicalType

MicrosInstant
无法解析时间戳。我可以确认我的时间戳字段的值不为 NULL。

File "apache_beam/coders/coder_impl.py", line 1890, in apache_beam.coders.coder_impl.LogicalTypeCoderImpl.decode_from_stream
  File "/Users/archit.shah/PycharmProjects/duplopy-pysql-beam/venv-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 873, in to_language_type
    return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
                             ^^^^^^^^^^^^^^^^^^
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'

我还在下面的 Java 警告中看到了这一点:

WARNING:root:severity: WARN
timestamp {
  seconds: 1697642143
  nanos: 162000000
}
message: "Hanged up for url: \"host.docker.internal:58970\"\n."
log_location: "org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer"
thread: "16"

我可能错过了什么吗?我也尝试注册编码器,但结果相同。

apache-beam apache-beam-io apache-beam-jdbcio
1个回答
0
投票

在创建管道之前添加这两行应该可以解决问题

from apache_beam.typehints.schemas import MillisInstant

LogicalType.register_logical_type(MillisInstant)

这是由于 Java JdbcIO 使用 joda 时间戳。在 https://github.com/apache/beam/issues/28359 得到解决之前,需要此解决方法

© www.soinside.com 2019 - 2024. All rights reserved.