我正在寻找有关如何使用 Beam Python SDK 从 Postgres 读取数据/向 Postgres 写入数据的资源。到目前为止,我了解到 apache_beam.io.jdbc 是我们最好的选择(如果有更好的选择,请告诉我)。
我尝试使用它,它能够处理原始数据类型,例如整数和字符串。但是,它无法处理 Postgres 中的 LogicalTypes,例如“timestamp without time zone”类型。
这是我的小实验的一些细节。感谢任何帮助!
Python v3.11.4
apache-beam v2.51.0(Python SDK)
postgres v11.5
DirectRunner
这是管道代码:
with beam.Pipeline(options=None) as p:
pipeline = (
p
| ReadFromJdbc(
table_name="table_name",
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:{}://{}:{}/{}'.format("postgresql", "127.0.0.1", "5432", "db_name"),
username="postgres",
password="redacted",
query="SELECT * FROM table_name")
| beam.Map(print)
)
在尝试解析“没有时区的时间戳”列时,会遇到以下错误。我的理解是 LogicalType
MicrosInstant
无法解析时间戳。我可以确认我的时间戳字段的值不为 NULL。
File "apache_beam/coders/coder_impl.py", line 1890, in apache_beam.coders.coder_impl.LogicalTypeCoderImpl.decode_from_stream
File "/Users/archit.shah/PycharmProjects/duplopy-pysql-beam/venv-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 873, in to_language_type
return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
^^^^^^^^^^^^^^^^^^
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'
我还在下面的 Java 警告中看到了这一点:
WARNING:root:severity: WARN
timestamp {
seconds: 1697642143
nanos: 162000000
}
message: "Hanged up for url: \"host.docker.internal:58970\"\n."
log_location: "org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer"
thread: "16"
我可能错过了什么吗?我也尝试注册编码器,但结果相同。
在创建管道之前添加这两行应该可以解决问题
from apache_beam.typehints.schemas import MillisInstant
LogicalType.register_logical_type(MillisInstant)
这是由于 Java JdbcIO 使用 joda 时间戳。在 https://github.com/apache/beam/issues/28359 得到解决之前,需要此解决方法