从MSSQL vis kafka连接器到弹性搜索的嵌套类型的批量数据更新失败。

问题描述 投票:0回答:1

MSSQL值:列名=prop --> value= 100和列名=role --> value= [{"角色": "演员"},{"角色": "导演"}]NOTE: 列名:角色以json格式保存。

从kafka主题读取。

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"prop"
         },
         {
            "type":"string",
            "optional":true,
            "field":"roles"
         }
 ],
      "optional":false
   },
   "payload":{ "prop":100, "roles":"[{"role":"actor"},{"role":"director"}]"}

失败的原因是 。

Error was [{"type":"mapper_parsing_exception","reason":"object mapping for [roles] tried to parse field [roles] as object, but found a concrete value"}

失败原因是连接器无法为角色创建模式为数组。

上述输入消息是由汇流JdbcSourceConnector创建的,使用的sink连接器是汇流ElasticsearchSinkConnector。

配置详情 :

水槽配置。

name=prop-test
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.url=<elasticseach url>

tasks.max=1

topics=test_prop
type.name=prop

#transforms=InsertKey, ExtractId

transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=prop

transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=prop

源配置。

name=test_prop_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://*.*.*.*:1433;instance=databaseName=test;
connection.user=*****
connection.password=*****
query=EXEC <store proc>
mode=bulk
batch.max.rows=2000000
topic.prefix=test_prop
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=prop

transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=prop

connect-standalone.properties:

    bootstrap.servers=localhost:9092

    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable=true
    value.converter.schemas.enable=true

需要了解如何明确地使模式为ARRAY的角色,而不是一个字符串。

elasticsearch apache-kafka apache-kafka-connect confluent
1个回答
0
投票

事实上,jdbc Source连接器总是将columnname看作是fieldname,columnvalue看作是value。这种从字符串到数组的转换在现有的jdbc源连接器支持下是不可能的,必须通过自定义转换或自定义插件才能实现。

从MSSQL中获取数据并将其插入到Elastic Search中的最佳选择是使用logstash。它有丰富的过滤插件,可以使数据从MSSQL以任何所需的格式流向任何所需的JSON输出环境(logstashkafka主题)。

流程。MSSQL --> logstash --> Kakfa Topic --> Kafka Elastic sink connector --> Elastic Search

© www.soinside.com 2019 - 2024. All rights reserved.