如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?

问题描述 投票:0回答:2

我有一个 PostGIS + Debezium/Kafka + Debezium/Connect 设置,可将更改从一个数据库流式传输到另一个数据库。我一直在通过 Kowl 观看消息,一切都在相应地进行。

我的问题依赖于当我从 Kafka 主题读取消息时,特别是几何 (wkb) 列。

这是我的卡夫卡消息:

{
    "schema":{
        "type":"struct"
        "fields":[...]
        "optional":false
        "name":"ecotx_geometry_kafka.ecotx_geometry_impo..."
    }
    "payload":{
        "before":NULL
        "after":{
            "id":"d6ad5eb9-d1cb-4f91-949c-7cfb59fb07e2"
            "type":"MultiPolygon"
            "layer_id":"244458fa-e6e0-4c6c-a7e1-5bf0afce2fb8"
            "geometry":{
                "wkb":"AQYAACBqCAAAAQAAAAEDAAAAAQAAAAUAAABwQfUo..."
                "srid":2154
            }
        "custom_style":NULL
        "style_id":"default_layer_style"
    }
    "source":{...}
    "op":"c"
    "ts_ms":1618854994546
    "transaction":NULL
    }
}

可以看出,WKB 信息类似于“AQAAAAA...”,尽管我的数据库中插入的信息是“01060000208A7A000000000000”或“LINESTRING(0 0,1 0)”。

而且我不知道如何在我的消费者应用程序(Kotlin/Java)中将其解析/转换为 ByteArray 或几何图形,以便在 GeoTools 中进一步使用。

我不知道我是否缺少能够翻译此信息的导入。

我对人们发布 json 消息有几个问题,并且每条具有 geom 字段(通过 Debezium 流式传输)的消息都更改为“AAAQQQAAAA”。

话虽如此,我如何解析/解码/翻译它为GeoTools可以使用的东西?

谢谢。

@更新

附加信息:

插入后,当我分析插槽更改(使用 pg_logic_slot_get_changes 函数查询数据库)时,我可以在 WKB 中看到我的更改:

{"change":[{"kind":"insert","schema":"ecotx_geometry_import","table":"geometry_data","columnnames":["id","type","layer_id","geometry","custom_style","style_id"],"columntypes":["uuid","character varying(255)","uuid","geometry","character varying","character varying"],"columnvalues":["469f5aed-a2ea-48ca-b7d2-fe6e54b27053","MultiPolygon","244458fa-e6e0-4c6c-a7e1-5bf0afce2fb8","01060000206A08000001000000010300000001000000050000007041F528CB332C413B509BE9710A594134371E05CC332C4111F40B87720A594147E56566CD332C4198DF5D7F720A594185EF3C8ACC332C41C03BEDE1710A59417041F528CB332C413B509BE9710A5941",null,"default_layer_style"]}]}

这在消费者应用程序中很有用,这绝对依赖于 Kafka Message 内容本身,只是不确定谁在转换这个值,是 Kafka 还是 DBZ/Connect。

apache-kafka gis geo debezium
2个回答
1
投票

我认为这只是在 PostGIS 和 JSON 中表示二进制列的不同方式。 WKB 是一个二进制字段,这意味着它具有具有任意值的字节,其中许多没有相应的可打印字符。 PostGIS 使用十六进制编码将其打印出来,因此它看起来像“01060000208A7A...” - 十六进制数字,但在内部它只是字节。 Kafka 的 JSON 使用 BASE64 编码来编码完全相同的二进制消息。

让我们用字符串的前缀进行测试,

select to_base64(from_hex('01060000206A080000010000000103000000010000000500'))

AQYAACBqCAAAAQAAAAEDAAAAAQAAAAUA

0
投票
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKBReader;
import javax.xml.bind.DatatypeConverter;

...
        io.debezium.data.geometry.Geometry destinationLocation = debeziumRide.getDestinationlocation();
        ByteBuffer buffer = destinationLocation.getWkb();
        byte[] byteArray = buffer.array();
        String hexString = DatatypeConverter.printHexBinary(byteArray);
        byte[] bytes = DatatypeConverter.parseHexBinary(hexString);

        Point destinationPoint = (Point) reader.read(bytes);

这个设置适合我

这是我的 PostgreSQL DDL(和一些预先设置)

CREATE SCHEMA IF NOT EXISTS taxi_booking;
CREATE EXTENSION postgis;
UPDATE pg_extension SET extrelocatable = TRUE WHERE extname = 'postgis';
ALTER EXTENSION postgis SET SCHEMA taxi_booking;
ALTER DATABASE ride_db SET search_path TO taxi_booking;

CREATE TABLE taxi_booking.rides
(
    id                     varchar(255)                       NOT NULL,
    destinationlocation    taxi_booking.geometry(point, 4326) NOT NULL,
    pickupplacelocation    taxi_booking.geometry(point, 4326) NOT NULL,
    CONSTRAINT rides_pkey PRIMARY KEY (id)
);
© www.soinside.com 2019 - 2024. All rights reserved.