我试图建立卡夫卡连接器与一个自定义值转换器。
我使用的是卡夫卡转移连载节俭的对象。 我想建立被反序列化节俭的消息,将它们转换为JSON和发送到elasticsearch卡夫卡连接器。
方法org.apache.kafka.connect.storage.Converter#toConnectData
返回SchemaAndValue
,需要org.apache.kafka.connect.data.Schema
。
我如何获得这个架构的json?
我试过到目前为止:
我试图扩大org.apache.kafka.connect.json.JsonConverter
,但它有它自己的模式从什么地方来了。
我尝试使用这个库生成模式:https://github.com/reinert/JJSchema,但JsonConverter
似乎有它自己的格式:它期望的,而不是map
等object
请参阅:https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L408
尽管我在配置有禁用方案("value.converter.schemas.enable":"false"
),连接器仍然会崩溃,并抱怨架构。这哪里是架构是从哪里来的?它们是如何产生的呢?
我会写一个方法,它是递归重命名所有“错误”的事情在JSON模式,但是这太尴尬。是否有适当的解决办法?
UPD:我的配置是
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "mytopic",
"key.ignore": "true",
"connection.url": "https://my-elastic:443",
"type.name": "event",
"elasticsearch.index.prefix" : "kafka",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "com.example.ThriftToJsonDeserializer",
"value.converter.schemas.enable":"false"
}
问题是,Elasticsearch连接器试图推断映射根据消息模式elasticsearch。消息模式由Converter
创建并由Transforms
修改。如果设置value.converter.schemas.enable
上false
您的记录模式是空值。
你必须设置schema.ignore
为true,Elasticsearch连接器将无法推断架构。