我可以在非kafka主题上定义架构吗?

问题描述 投票:0回答:1

我将数据和值模式发送到kafka主题,如下所示:

./bin/kafka-avro-console-producer \
  --broker-list 10.0.0.0:9092 --topic orders \
  --property parse.key="true" \
  --property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
  --property key.separator="$" \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type":  ["null","int"],"default": null}, {"name":"price","type":  ["null","int"],"default": null}]}' \
  --property schema.registry.url=http://10.0.0.0:8081

然后,我从kafka主题获得了像这样的汇属性一样的汇合数据:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

但是我只想从kafka中获取json而没有value.schema。如果我把kafka主题放这个json数据

{"id":9}${"id": {"int":9}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":61}}

如何从kafka获取此数据,并将oracle与汇合的jdbc接收器放在一起。

我想在融合方面制作模式?

另外一件事是,我可以从一个kafka主题获取两种不同类型的数据,并且在Oracle方面使用汇合的jdbc进入两个不同的表。

预先感谢

apache-kafka apache-kafka-connect confluent-platform
1个回答
0
投票

如果您的源主题包含未声明任何模式的JSON数据,则必须添加该模式,然后才能使用JDBC Sink。

选项包括:

  1. ksqlDB,如此处所示:https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s
  2. Kafka Connect的单个消息转换功能。 Apache Kafka没有提供SMT来执行此操作,但是有prototypes out there可以执行此操作。
  3. [其他流处理,例如Kafka Streams
© www.soinside.com 2019 - 2024. All rights reserved.