无法使用 GPKAFKA 在 GreenPlum 中插入数据

问题描述 投票:0回答:1

我在使用 GPKAFKA 从 KAFKA 读取数据并将数据插入 GreenPlum 数据库时遇到问题。我的目标表有多个不同数据类型的列,并且 GPKAFKA 创建的外部表是我的目标表的复制,但是,当将数据推送到目标表时,我面临错误:

调试,由于无法执行批处理而回滚批处理0:pq:avro_import:仅支持单个json列

我使用的GreenPlum数据库版本是6.19.3。详细内容如下。如何解决这个问题?

具有 avro 架构的 json 格式的 KAFKA 消息:

{
    "key1": "try81",
    "col1": {
        "int": 1
    },
    "col2": {
        "string": "def"
    },
    "col3": {
        "string": "ghi"
    },
    "col4": {
        "string": "jkl"
    },
    "instance_id": {
        "string": "009"
    }
}

表的DDL:

CREATE TABLE
    testgpkafka
    (
        key1 CHARACTER VARYING(5) NOT NULL,
        col1 INTEGER,
        col2 CHARACTER VARYING(55),
        col3 CHARACTER VARYING(19),
        col4 CHARACTER VARYING(13),
        instance_id CHARACTER VARYING(15),
        PRIMARY KEY (key1)
    );

gpkafka.yaml 文件:

DATABASE: gpdb_dev
USER: --
PASSWORD: --
HOST: --
PORT: --
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.151.201:9092, 192.168.151.202:9092, 192.168.151.203:9092
        TOPIC: mcp_kafka_net_21.mcp.testgpkafka
        PARTITIONS: (0)
      COLUMNS:
      FORMAT: avro
      AVRO_OPTION:
        SCHEMA_REGISTRY_ADDR: http://192.168.151.201:8081   
   OUTPUT:
      SCHEMA: smi
      TABLE:  testgpkafka
      MODE:   insert

我也尝试过模式类型 json,但遇到错误:

调试,由于无法执行批处理而回滚批处理0:pq:json_import:仅支持单个json / jsonb / gp_jsonb列

以下是kafka消息:

{
    "key1": "tr159",
    "col1": 1,
    "col2": "def",
    "col3": "ghi",
    "col4": "jkl",
    "instance_id": "009"
}

我也尝试过将 KAFKA 消息转换为单个 json 列,但它也不起作用。

卡夫卡消息:

{
    "data": {
        "key1": "tr150",
        "col1": 1,
        "col2": "def",
        "col3": "ghi",
        "col4": "jkl",
        "instance_id": "009"
    }
}
apache-kafka apache-kafka-streams apache-kafka-connect greenplum
1个回答
0
投票

插入模式新功能。更稳定的是映射

类似这样的事情

DATABASE: gpdb_dev
USER: --
PASSWORD: --
HOST: --
PORT: --
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.151.201:9092, 192.168.151.202:9092, 192.168.151.203:9092
        TOPIC: mcp_kafka_net_21.mcp.testgpkafka
        PARTITIONS: (0)
    VALUE:
        COLUMNS:
          - NAME: jdata
            TYPE: gp_json # custom type from EXTENSION dataflow in GreenPlum or you can use type "json" but it crashed in null char "\0000"
      COLUMNS:
      FORMAT: avro
      AVRO_OPTION:
        SCHEMA_REGISTRY_ADDR: http://192.168.151.201:8081  
   OUTPUT:
      SCHEMA: smi
      TABLE: testgpkafka
      MAPPING: 
        - NAME: key1
          EXPRESSION: (jdata->>'key1')::varchar(5)
        - NAME: col1
          EXPRESSION: (jdata->'col1'->>'int')::integer
        - NAME: col2
          EXPRESSION: (jdata->'col2'->>'string')::varchar(55) 
        - NAME: col3
          EXPRESSION: (jdata->'col3'->>'string')::varchar(19) 
        - NAME: col4
          EXPRESSION: (jdata->'col4'->>'string')::varchar(13) 
        - NAME: instance_id
          EXPRESSION: (jdata->'col4'->>'string')::varchar(15)

或者没有像这样最后的变换

      MAPPING: 
        - NAME: key1
          EXPRESSION: (jdata->>'key1')::varchar(5)
        - NAME: col1
          EXPRESSION: (jdata->>'col1')::integer
        - NAME: col2
          EXPRESSION: (jdata->>'col2')::varchar(55) 
        - NAME: col3
          EXPRESSION: (jdata->>'col3')::varchar(19) 
        - NAME: col4
          EXPRESSION: (jdata->>'col4')::varchar(13) 
        - NAME: instance_id
          EXPRESSION: (jdata->>'col4')::varchar(15)
© www.soinside.com 2019 - 2024. All rights reserved.