我有一个 Debezium 连接器(在 ksqlDB 服务器上运行)的设置,它将值从 SQL Server CDC 表流式传输到 Kafka 主题。我正在尝试将消息的密钥从 JSON 转换为整数值。我收到的示例密钥如下所示:
{"InternalID":11117}
,我想将其表示为数字11117
。根据 Kafka Connect 文档,使用 ExtractField SMT 应该相当容易。但是,当我配置连接器以使用此转换时,我收到错误Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID
。
连接器配置:
CREATE SOURCE CONNECTOR properties_sql_connector WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= 'propertiessql',
'database.port'= '1433',
'database.user'= 'XXX',
'database.password'= 'XXX',
'database.dbname'= 'Properties',
'database.server.name'= 'properties',
'table.exclude.list'= 'dbo.__EFMigrationsHistory',
'database.history.kafka.bootstrap.servers'= 'kafka:9091',
'database.history.kafka.topic'= 'dbhistory.properties',
'key.converter.schemas.enable'= 'false',
'transforms'= 'unwrap,extractField',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.delete.handling.mode'= 'none',
'transforms.extractField.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractField.field'= 'InternalID',
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter');
错误详情:
--------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:258)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
... 11 more
对于为什么这种转变失败有什么想法吗?我是否缺少某些配置? 当
extractField
变换被删除时,我的消息的键如下所示:{"InternalID":11117}
默认情况下,当您为任何连接器(包括 Debezium)配置 SMT 时,转换将应用于连接器发出的每条记录。这包括可能没有检索到的数据并且可能没有必要字段的更改事件消息。 为了解决这个问题,您需要有选择地将 SMT 应用于 Debezium 使用 SMT 谓词生成的更改事件消息的特定子集。
官方文档位于这里。
在您的特定情况下,您可以仅将 SMT 应用于该特定数据库表的输出主题,它看起来像这样:
# Create a predicate that matches your output
predicates: topicNameMatch
predicates.topicNameMatch.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.topicNameMatch.pattern: *output topic name goes here*
# Your logic to extract the field from the key
transforms.extractField.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractField.field: InternalID
# This references the predicate above
transforms.extractField.predicate: topicNameMatch
如果主题名称匹配不适合您,上面列出的文档中还有其他谓词。
为了从 JSON 中提取命名字段,您需要
schemas.enable = 'true'
该转换器
对于任何并非源自 Debezium 的数据,都需要 JSON 具有架构作为事件的一部分。
或者,如果您正在使用架构注册表,请切换到使用该注册表的其他转换器,它应该可以工作。
ExtractNewRecordState
从 SMT Envelope 中提取 payload.after
并将该数据作为值。当您需要使用 InternalID
变换将 ValueToKey
从值复制到键时。 ExtractField$Key
从键结构中提取字段,默认情况下该结构只有表的主键,可能不是InternalID
。