ksqlDB 无法反序列化 JSON 密钥

问题描述 投票:0回答:1

我正在尝试使用 Kafka Connect 中的 JDBC 源连接器将数据加载到 ksqlDB 源表中。主题键是从其中两列创建的 JSON 对象。 ksqlDB 无法反序列化密钥并出现错误。是否可以在 ksqlDB 中创建一个源表,其中两列作为从 JSON 键反序列化的主键?

连接器配置为:

CREATE SOURCE CONNECTOR IF NOT EXISTS loadshedding_model_source WITH (
  'connector.class'             = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url'              = 'jdbc:postgresql://cockroachdb:26257/db',
  'connection.user'             = 'user',
  'connection.password'         = 'password',
  'connection.options'          = '-c multiple_active_portals_enabled=true',
  'topic.prefix'                = 'jdbc_',
  'table.whitelist'             = 'whitelist_table',
  'mode'                        = 'bulk',
  'numeric.mapping'             = 'best_fit',
  "poll.interval.ms"            = 1800000,
  'transforms'                  = 'createKey',
  'transforms.createKey.type'   = 'org.apache.kafka.connect.transforms.ValueToKey',
  'transforms.createKey.fields' = 'c1,c2',
  'topic.creation.default.partitions' = 3,
  'topic.creation.default.replication.factor' = 3
);

我从两列“c1”和“c2”创建 Kafka 键(请参阅转换配置)。这工作正常,我可以看到主题中的消息在 for

中有一个 JSON 键
{
  "c1": "c1v1",
  "c2": "c2": "v1"
}

我正在尝试在 ksqDB 中创建一个 SOURCE 表,其主键为 c1 和 c2 机智:

CREATE SOURCE TABLE whitelist_table (
  C1 VARCHAR PRIMARY KEY,
  C2 VARCHAR PRIMARY KEY,
  C3 VARCHAR,
  C4 TIMESTAMP,
  C5 INT
) WITH (
  KAFKA_TOPIC='jdbc_whitelist_table',
  KEY_FORMAT='JSON',
  VALUE_FORMAT='JSON'
)

ksqlDB 创建表但无法反序列化键。我收到以下错误消息:

ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: jdbc_model_load_shedding. Unrecognized token 'Struct': was expecting ( JSON String, Number, Array, Object or token 'null', 'true' or 'false')","recordB64":null,"cause":["Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"],"topic":"jdbc_whitelist_table"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.CST_WHITELIST_TABLE_11.KsqlTopic.Source.deserializer:44)
apache-kafka apache-kafka-connect ksqldb
1个回答
0
投票

如果您看到提及

Struct
令牌/字符串的错误,那么您正在使用
StringConverter
,它不适用于 JSON 数据。您需要在连接器配置中为键/值转换器设置设置
JsonConverter

© www.soinside.com 2019 - 2024. All rights reserved.