我将数据和值模式发送到kafka主题,如下所示:
./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type": ["null","int"],"default": null}, {"name":"price","type": ["null","int"],"default": null}]}' \
--property schema.registry.url=http://10.0.0.0:8081
然后,我从kafka主题获得了像这样的汇属性一样的汇合数据:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "ersin",
"connection.password": "ersin!",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
但是我只想从kafka中获取json而没有value.schema。如果我把kafka主题放这个json数据
{"id":9}${"id": {"int":9}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":61}}
如何从kafka获取此数据,并将oracle与汇合的jdbc接收器放在一起。
我想在融合方面制作模式?
另外一件事是,我可以从一个kafka主题获取两种不同类型的数据,并且在Oracle方面使用汇合的jdbc进入两个不同的表。
预先感谢
如果您的源主题包含未声明任何模式的JSON数据,则必须添加该模式,然后才能使用JDBC Sink。
选项包括: