Debezium 源连接器,向 `after` 对象添加一个字段

问题描述 投票:0回答:1

我正在 Kafka 中使用 Debezium PostgreSQL 源连接器进行 CDC。下面是我想要推送到 Kafka 的所需消息模式。最值得注意的是

after.source
字段。

after.source
字段是一个静态的、永不改变的字段,源数据库中不存在该字段。由于供应商的支持协议,我无法在源数据库中编辑或创建视图。

{
    "before": null,
    "after": {
      "rid": "3b99c447-65a8-4d6b-bbff-2c33b7944696",
      "cust": 75862,
      "loc": 916719,
      "meter": "A90OC5385040",
      "cosum": "2.06",
      "cosdt": 1673330400000000,
      "costy": "I",
      "source": "C"
    },
    "source": {
      "version": "2.4.2.Final",
      "connector": "postgresql",
      "name": "mmv2_pgami",
      "ts_ms": 1711944632077,
      "snapshot": "false",
      "db": "Pgami_db",
      "sequence": "[null,\"23516504\"]",
      "schema": "public",
      "table": "mreads",
      "txId": 574,
      "lsn": 23516504,
      "xmin": null
    },
    "op": "c",
    "ts_ms": 1711944632426,
    "transaction": null
  }

我可以从 Debezium 源连接器中添加嵌入到

source
对象中的
after
字段吗(如上例所示)?

下面是我尝试过的,但它向根添加了一个名为“after.source”的新字段,这是不正确的。

{
    "name": "postgres_connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        // Other configuration properties...

        // Add the following configuration for the AddFields transformation
        "transforms": "addSourceField",
        "transforms.addSourceField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addSourceField.static.field": "after.source",
        "transforms.addSourceField.static.value": "C"
    }
}
apache-kafka apache-kafka-connect
1个回答
0
投票

@OneCricketeer 是正确的,因此借鉴他的回答并添加到其中,基本上,我最终使用

ExtractNewRecordState
将规则简化为仅
after
对象。此后我使用重命名转换:

{
    "name": "postgres_connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        // Other configuration properties...

        // Add the following configuration for the AddFields transformation
        "transforms": "unwrap,addSourceField",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "none",
        "transforms.addSourceField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addSourceField.static.field":"source",
        "transforms.addSourceField.static.value":"C"
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.