卡夫卡连接,得到JSON模式的JsonConverter

问题描述 投票:1回答:1

我试图建立卡夫卡连接器与一个自定义值转换器。

我使用的是卡夫卡转移连载节俭的对象。 我想建立被反序列化节俭的消息,将它们转换为JSON和发送到elasticsearch卡夫卡连接器。

方法org.apache.kafka.connect.storage.Converter#toConnectData返回SchemaAndValue,需要org.apache.kafka.connect.data.Schema。 我如何获得这个架构的json?

我试过到目前为止:

我试图扩大org.apache.kafka.connect.json.JsonConverter,但它有它自己的模式从什么地方来了。

我尝试使用这个库生成模式:https://github.com/reinert/JJSchema,但JsonConverter似乎有它自己的格式:它期望的,而不是mapobject 请参阅:https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L408

尽管我在配置有禁用方案("value.converter.schemas.enable":"false"),连接器仍然会崩溃,并抱怨架构。这哪里是架构是从哪里来的?它们是如何产生的呢?

我会写一个方法,它是递归重命名所有“错误”的事情在JSON模式,但是这太尴尬。是否有适当的解决办法?

UPD:我的配置是

{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "mytopic",
    "key.ignore": "true",
    "connection.url": "https://my-elastic:443",
    "type.name": "event",
    "elasticsearch.index.prefix" : "kafka",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter" : "com.example.ThriftToJsonDeserializer",
    "value.converter.schemas.enable":"false"
}
json elasticsearch apache-kafka thrift apache-kafka-connect
1个回答
2
投票

问题是,Elasticsearch连接器试图推断映射根据消息模式elasticsearch。消息模式由Converter创建并由Transforms修改。如果设置value.converter.schemas.enablefalse您的记录模式是空值。

你必须设置schema.ignore为true,Elasticsearch连接器将无法推断架构。

© www.soinside.com 2019 - 2024. All rights reserved.