我正在尝试使用 Kafka Connect 中的 JDBC 源连接器将数据加载到 ksqlDB 源表中。主题键是从其中两列创建的 JSON 对象。 ksqlDB 无法反序列化密钥并出现错误。是否可以在 ksqlDB 中创建一个源表,其中两列作为从 JSON 键反序列化的主键?
连接器配置为:
CREATE SOURCE CONNECTOR IF NOT EXISTS loadshedding_model_source WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://cockroachdb:26257/db',
'connection.user' = 'user',
'connection.password' = 'password',
'connection.options' = '-c multiple_active_portals_enabled=true',
'topic.prefix' = 'jdbc_',
'table.whitelist' = 'whitelist_table',
'mode' = 'bulk',
'numeric.mapping' = 'best_fit',
"poll.interval.ms" = 1800000,
'transforms' = 'createKey',
'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.createKey.fields' = 'c1,c2',
'topic.creation.default.partitions' = 3,
'topic.creation.default.replication.factor' = 3
);
我从两列“c1”和“c2”创建 Kafka 键(请参阅转换配置)。这工作正常,我可以看到主题中的消息在 for
中有一个 JSON 键{
"c1": "c1v1",
"c2": "c2": "v1"
}
我正在尝试在 ksqDB 中创建一个 SOURCE 表,其主键为 c1 和 c2 机智:
CREATE SOURCE TABLE whitelist_table (
C1 VARCHAR PRIMARY KEY,
C2 VARCHAR PRIMARY KEY,
C3 VARCHAR,
C4 TIMESTAMP,
C5 INT
) WITH (
KAFKA_TOPIC='jdbc_whitelist_table',
KEY_FORMAT='JSON',
VALUE_FORMAT='JSON'
)
ksqlDB 创建表但无法反序列化键。我收到以下错误消息:
ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: jdbc_model_load_shedding. Unrecognized token 'Struct': was expecting ( JSON String, Number, Array, Object or token 'null', 'true' or 'false')","recordB64":null,"cause":["Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"],"topic":"jdbc_whitelist_table"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.CST_WHITELIST_TABLE_11.KsqlTopic.Source.deserializer:44)
如果您看到提及
Struct
令牌/字符串的错误,那么您正在使用 StringConverter
,它不适用于 JSON 数据。您需要在连接器配置中为键/值转换器设置设置JsonConverter