在 Confluence Control Center UI 中生成 Avro 消息

问题描述 投票:0回答:2

要开发数据传输应用程序,我需要首先定义键/值 avro 模式。在定义 avro 模式之前,生产者应用程序尚未开发。

我克隆了一个主题及其键/值 avro 模式,它们已经在工作并且 并且还克隆了 jdbc snink 连接器。我只是更改了主题和连接器名称。

然后我使用 Confluence Topic Message UI Producer 复制现有消息并成功发送接收器。

但它发送错误:“未知的魔法字节!”

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:323)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:164)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
        ... 17 more
[2022-07-25 03:45:42,385] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)

阅读其他问题,似乎必须使用模式序列化消息。

带有 kafka-avro-console-consumer 的未知魔法字节

是否可以使用 Confluence Topic UI 向具有 AVRO 键/值模式的主题发送消息?

知道 avro 模式是否需要取决于连接器/源的信息吗?或者命名空间是否取决于主题名称?

这是我的关键架构。主题名称是knov_03

{
  "connect.name": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming.Key",
  "fields": [
    {
      "name": "id_sap_incoming",
      "type": "long"
    }
  ],
  "name": "Key",
  "namespace": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming",
  "type": "record"
}

连接器:

{
  "name": "knov_05",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "knov_03",
    "connection.url": "jdbc:mysql://eXXXXX:3306/MY_DB_SCHEMA?useSSL=FALSE&nullCatalogMeansCurrent=true",
    "connection.user": "USER",
    "connection.password": "PASSWORD",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id_sap_incoming",
    "auto.create": "true",
    "auto.evolve": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

谢谢。

apache-kafka avro apache-kafka-connect confluent-schema-registry confluent-control-center
2个回答
2
投票

从当前版本的 Confluence Control Center 开始,它不支持直接通过其 UI 发送 Avro 格式的消息。控制中心主要用于管理和监控您的 Kafka 环境。

为了生成 Avro 消息,您通常会使用 Confluence 提供的其他工具,例如 Confluence REST Proxy 或 Kafka Avro Console Producer。这两个工具都允许您将 Avro 消息发送到 Kafka,但它们要求您的 Avro 模式已在 Confluence Schema 注册表中注册。


0
投票

2024年的答案:

我最近遇到了类似的情况,并找到了一种使用 Confluence CLI 发送 Avro 格式消息的便捷方法。

首先你可以按照这个文档安装Confluence CLI,然后配置你的API密钥和秘密:

confluent api-key store your-api-key your-api-secret --resource kafka-cluster-id
confluent api-key use your-api-key
  • your-api-key
    your-api-secret
    替换为生成的值。
  • 将 kafka-cluster-id 替换为您的 Kafka 集群 ID。

在 Kafka 主题选项卡中,转到 Schema 选项卡查找架构 ID(通常是数字)。

然后你可以用标准 JSON 格式编写消息,例如:

{"registertime": 1715086531577, "userid": "someid", "regionid": "UK", "gender": "FEMALE"}

要发送此消息,您只需运行以下命令:

echo '{"registertime": 1715086531577, "userid": "someid", "regionid": "UK", "gender": "FEMALE"}' | \
confluent kafka topic produce your_topic --value-format avro --schema 123456
  • your_topic
    替换为您的 Kafka 主题名称
  • 123456
    替换为您的架构 ID

如果一切正常,您将看到消息:

Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit

现在 Avro 格式的消息应该出现在您的 Kafka 主题中

© www.soinside.com 2019 - 2024. All rights reserved.