Sedona + Flink JDBC 和 Postgis 数据类型(UUID、地理)

问题描述 投票:0回答:1

我试图连接到 Postgres (Postgis) 并从具有 UUID 和 Geography 列的表中检索数据:

CREATE TABLE area_of_interest (
    id uuid NOT NULL,
    geometry public.geography(multipolygon, 4326) NULL
    CONSTRAINT "id_pk" PRIMARY KEY (id)
);

我已经看到了问题 Flink JDBC UUID – 源连接器,该解决方案不适用于 Flink 1.18。

我尝试使用以下方法注册表,但失败了:

TableDescriptor
    .forConnector("jdbc")
    .option(...)
    ...
    .schema(
        Schema.newBuilder()
            .column("id", DataTypes.STRING().notNull()) // doesn't work -> java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.lang.String (java.util.UUID and java.lang.String are in module java.base of loader 'bootstrap')
            // .column("id", DataTypes.RAW(UUID.class).notNull())  // doesn't work -> Caused by: org.apache.flink.table.api.ValidationException: The PostgreSQL dialect doesn't support type: RAW('java.util.UUID', '...') NOT NULL.
            .column("geometry", DataTypes.STRING().notNull()) // doesn't work -> java.lang.ClassCastException: class org.postgresql.util.PGobject cannot be cast to class java.lang.String (org.postgresql.util.PGobject is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
            // .column("geometry", DataTypes.RAW(Geometry.class))  // doesn't work -> Caused by: org.apache.flink.table.api.ValidationException: The PostgreSQL dialect doesn't support type: RAW('org.locationtech.jts.geom.Geometry', '...')
            // .columnByExpression("geometry", "cast(geometry as varchar)") // doesn't work -> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'geometry'
            // .columnByExpression("geometry", "ST_asEWKT(geometry)") // doesn't work -> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'geometry'
            .primaryKey("id")
            .build())
        .build()

尝试将 java.util.UUID(Postgres 列是

uuid
)列映射到 java.lang.String 时出错:

java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.lang.String (java.util.UUID and java.lang.String are in module java.base of loader 'bootstrap')
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$createInternalConverter$224afae6$10(AbstractJdbcRowConverter.java:176) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]

尝试将 Geography(Postgres 列为

geography(multipolygon, 4326)
)列映射到 java.lang.String 时出错:

java.lang.ClassCastException: class org.postgresql.util.PGobject cannot be cast to class java.lang.String (org.postgresql.util.PGobject is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$createInternalConverter$224afae6$10(AbstractJdbcRowConverter.java:176) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:97) ~[flink-streaming-java-1.18.1.jar:1.18.1]

我想问一下将这些类型从 Postgis 映射到 Flink Table & SQL API 的假定方法是什么,或者其他可以帮助我连接到 Postgis 并检索具有这些类型的列的方法。

我尝试使用不同的方法来映射这些列,但没有成功。

postgis flink-sql apache-sedona
1个回答
0
投票

为了解决这个问题,我在 Postgres 中创建了一个视图:

create or replace
view vw_area_of_interest as
select
    id::text as id,
    st_asewkb(cast(geometry as geometry)) as geometry
from
    area_of_interest
where
    geometry is not null;

并使用这样的模式:

Schema.newBuilder()
    .column("id", DataTypes.STRING().notNull())
    .column("geometry", DataTypes.BYTES().notNull())
    .primaryKey("id")
    .build())
© www.soinside.com 2019 - 2024. All rights reserved.