MSSQL值:列名=prop --> value= 100和列名=role --> value= [{"角色": "演员"},{"角色": "导演"}]NOTE: 列名:角色以json格式保存。
从kafka主题读取。
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"prop"
},
{
"type":"string",
"optional":true,
"field":"roles"
}
],
"optional":false
},
"payload":{ "prop":100, "roles":"[{"role":"actor"},{"role":"director"}]"}
失败的原因是 。
Error was [{"type":"mapper_parsing_exception","reason":"object mapping for [roles] tried to parse field [roles] as object, but found a concrete value"}
失败原因是连接器无法为角色创建模式为数组。
上述输入消息是由汇流JdbcSourceConnector创建的,使用的sink连接器是汇流ElasticsearchSinkConnector。
配置详情 :
name=prop-test
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.url=<elasticseach url>
tasks.max=1
topics=test_prop
type.name=prop
#transforms=InsertKey, ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=prop
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=prop
name=test_prop_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://*.*.*.*:1433;instance=databaseName=test;
connection.user=*****
connection.password=*****
query=EXEC <store proc>
mode=bulk
batch.max.rows=2000000
topic.prefix=test_prop
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=prop
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=prop
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
需要了解如何明确地使模式为ARRAY的角色,而不是一个字符串。
事实上,jdbc Source连接器总是将columnname看作是fieldname,columnvalue看作是value。这种从字符串到数组的转换在现有的jdbc源连接器支持下是不可能的,必须通过自定义转换或自定义插件才能实现。
从MSSQL中获取数据并将其插入到Elastic Search中的最佳选择是使用logstash。它有丰富的过滤插件,可以使数据从MSSQL以任何所需的格式流向任何所需的JSON输出环境(logstashkafka主题)。
流程。MSSQL --> logstash --> Kakfa Topic --> Kafka Elastic sink connector --> Elastic Search