假设我有一个主题(用户),该主题具有json数据,但没有模式。
数据示例:
{"id":3151212170,"name":"John Wick"}
为了解决这个问题,我创建了一个流(user_stream),以从主题中获取数据并基本上为其提供一个架构。
create stream user_stream (id bigint, name string) with (kafka_topic='user', value_format='JSON', key = 'id');
然后使用我创建的另一个流的数据:
create stream user_final with (value_format = 'AVRO') as select * from USER_STREAM;
注意:数据现在是具有模式的Avro格式,但列现在是大写。
我正在使用Kafka的JdbcSinkConnector将数据接收到已经存在的Postgres表中。
Postgres表示例:
create table mytable (id bigint primary key, name text)
接收器连接器配置:
{
"name": "postgres-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "USER_FINAL",
"key.converter.schema.registry.url": "http://schema-reg-url:8081",
"value.converter.schema.registry.url": "http://schema-reg-url:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"connection.url": "jdbc:postgresql://postgres-url:5432/mydbname?user=username&password=password",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"table.name.format": "mytable",
"pk.mode": "none",
"insert.mode": "insert"
}
}
问题是流列名称在大写中,而Postgres表的列名称在小写中。即使我在Postgres中使用大写的列名创建表,它也将转换为小写。
错误:
org.postgresql.util.PSQLException: ERROR: column "ID" of relation "mytable" does not exist
是否有解决方法?我愿意接受建议。
即使我在Postgres中用大写的列名创建了一个表,只是转换为小写。
如果您的列名未放在双引号中,则PostgreSQL会将其转换为小写格式。
因此,
在Kafka Connect端,您可以使用Kafka Connect Single Message Transofrms (SMT)更改字段名称。
更确切地说,ReplaceField
使您可以重命名字段。例如,以下转换将把列名ReplaceField
和COL1
分别替换为COL2
和col1
:
col2