kafka connect - 如何从有效负载中过滤模式元数据

问题描述 投票:0回答:1

我正在尝试从有效负载中删除模式,这是配置

connector.properties

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/employee_db?user=root&password=root
table.whitelist=testemp
mode=incrementing
incrementing.column.name=employee_id
topic.prefix=test-mysql-jdbc-

以下是我的worker.properties

bootstrap.servers=localhost:9092


key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false


internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000
plugin.path=C:\Users\name\Desktop\kafka\libs

输出:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"employee_id"},{"type":"string","optional":false,"field":"first_name"}],"optional":false,"name":"testemp"},"payload":{"employee_id":2,"first_name":"test"}}

例外输出:

{"payload":{"employee_id":2,"first_name":"test"}}

我尝试在here中建议禁用工作中的value.converter.schemas.enable = false仍然没有效果

我错过了什么吗?

apache-kafka apache-kafka-connect
1个回答
1
投票

有两种方法可以解决它:

  • 从连接器配置中删除value.converter属性(使用相同的value.converter
  • 在连接器配置中设置value.converter.schemas.enable=false

架构已添加到消息中,因为您已覆盖值转换器并且未禁用架构(默认情况下启用了JsonConverter架构)。从Kafka Connect的角度来看,您使用了全新的Converter(它不会使用全局配置中的属性)

如果您要禁用架构,您的邮件将如下所示:

{
    "employee_id": 2,
    "first_name":"test"
}
© www.soinside.com 2019 - 2024. All rights reserved.