我在使用 GPKAFKA 从 KAFKA 读取数据并将数据插入 GreenPlum 数据库时遇到问题。我的目标表有多个不同数据类型的列,并且 GPKAFKA 创建的外部表是我的目标表的复制,但是,当将数据推送到目标表时,我面临错误:
调试,由于无法执行批处理而回滚批处理0:pq:avro_import:仅支持单个json列
我使用的GreenPlum数据库版本是6.19.3。详细内容如下。如何解决这个问题?
具有 avro 架构的 json 格式的 KAFKA 消息:
{
"key1": "try81",
"col1": {
"int": 1
},
"col2": {
"string": "def"
},
"col3": {
"string": "ghi"
},
"col4": {
"string": "jkl"
},
"instance_id": {
"string": "009"
}
}
表的DDL:
CREATE TABLE
testgpkafka
(
key1 CHARACTER VARYING(5) NOT NULL,
col1 INTEGER,
col2 CHARACTER VARYING(55),
col3 CHARACTER VARYING(19),
col4 CHARACTER VARYING(13),
instance_id CHARACTER VARYING(15),
PRIMARY KEY (key1)
);
gpkafka.yaml 文件:
DATABASE: gpdb_dev
USER: --
PASSWORD: --
HOST: --
PORT: --
KAFKA:
INPUT:
SOURCE:
BROKERS: 192.168.151.201:9092, 192.168.151.202:9092, 192.168.151.203:9092
TOPIC: mcp_kafka_net_21.mcp.testgpkafka
PARTITIONS: (0)
COLUMNS:
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://192.168.151.201:8081
OUTPUT:
SCHEMA: smi
TABLE: testgpkafka
MODE: insert
我也尝试过模式类型 json,但遇到错误:
调试,由于无法执行批处理而回滚批处理0:pq:json_import:仅支持单个json / jsonb / gp_jsonb列
以下是kafka消息:
{
"key1": "tr159",
"col1": 1,
"col2": "def",
"col3": "ghi",
"col4": "jkl",
"instance_id": "009"
}
我也尝试过将 KAFKA 消息转换为单个 json 列,但它也不起作用。
卡夫卡消息:
{
"data": {
"key1": "tr150",
"col1": 1,
"col2": "def",
"col3": "ghi",
"col4": "jkl",
"instance_id": "009"
}
}
插入模式新功能。更稳定的是映射
类似这样的事情
DATABASE: gpdb_dev
USER: --
PASSWORD: --
HOST: --
PORT: --
KAFKA:
INPUT:
SOURCE:
BROKERS: 192.168.151.201:9092, 192.168.151.202:9092, 192.168.151.203:9092
TOPIC: mcp_kafka_net_21.mcp.testgpkafka
PARTITIONS: (0)
VALUE:
COLUMNS:
- NAME: jdata
TYPE: gp_json # custom type from EXTENSION dataflow in GreenPlum or you can use type "json" but it crashed in null char "\0000"
COLUMNS:
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://192.168.151.201:8081
OUTPUT:
SCHEMA: smi
TABLE: testgpkafka
MAPPING:
- NAME: key1
EXPRESSION: (jdata->>'key1')::varchar(5)
- NAME: col1
EXPRESSION: (jdata->'col1'->>'int')::integer
- NAME: col2
EXPRESSION: (jdata->'col2'->>'string')::varchar(55)
- NAME: col3
EXPRESSION: (jdata->'col3'->>'string')::varchar(19)
- NAME: col4
EXPRESSION: (jdata->'col4'->>'string')::varchar(13)
- NAME: instance_id
EXPRESSION: (jdata->'col4'->>'string')::varchar(15)
或者没有像这样最后的变换
MAPPING:
- NAME: key1
EXPRESSION: (jdata->>'key1')::varchar(5)
- NAME: col1
EXPRESSION: (jdata->>'col1')::integer
- NAME: col2
EXPRESSION: (jdata->>'col2')::varchar(55)
- NAME: col3
EXPRESSION: (jdata->>'col3')::varchar(19)
- NAME: col4
EXPRESSION: (jdata->>'col4')::varchar(13)
- NAME: instance_id
EXPRESSION: (jdata->>'col4')::varchar(15)