要开发数据传输应用程序,我需要首先定义键/值 avro 模式。在定义 avro 模式之前,生产者应用程序尚未开发。
我克隆了一个主题及其键/值 avro 模式,它们已经在工作并且 并且还克隆了 jdbc snink 连接器。我只是更改了主题和连接器名称。
然后我使用 Confluence Topic Message UI Producer 复制现有消息并成功发送接收器。
但它发送错误:“未知的魔法字节!”
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:323)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:164)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
... 17 more
[2022-07-25 03:45:42,385] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
阅读其他问题,似乎必须使用模式序列化消息。
带有 kafka-avro-console-consumer 的未知魔法字节
是否可以使用 Confluence Topic UI 向具有 AVRO 键/值模式的主题发送消息?
知道 avro 模式是否需要取决于连接器/源的信息吗?或者命名空间是否取决于主题名称?
这是我的关键架构。主题名称是knov_03
{
"connect.name": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming.Key",
"fields": [
{
"name": "id_sap_incoming",
"type": "long"
}
],
"name": "Key",
"namespace": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming",
"type": "record"
}
连接器:
{
"name": "knov_05",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "knov_03",
"connection.url": "jdbc:mysql://eXXXXX:3306/MY_DB_SCHEMA?useSSL=FALSE&nullCatalogMeansCurrent=true",
"connection.user": "USER",
"connection.password": "PASSWORD",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id_sap_incoming",
"auto.create": "true",
"auto.evolve": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schema.registry.url": "http://schema-registry:8081"
}
}
谢谢。
从当前版本的 Confluence Control Center 开始,它不支持直接通过其 UI 发送 Avro 格式的消息。控制中心主要用于管理和监控您的 Kafka 环境。
为了生成 Avro 消息,您通常会使用 Confluence 提供的其他工具,例如 Confluence REST Proxy 或 Kafka Avro Console Producer。这两个工具都允许您将 Avro 消息发送到 Kafka,但它们要求您的 Avro 模式已在 Confluence Schema 注册表中注册。
2024年的答案:
我最近遇到了类似的情况,并找到了一种使用 Confluence CLI 发送 Avro 格式消息的便捷方法。
首先你可以按照这个文档安装Confluence CLI,然后配置你的API密钥和秘密:
confluent api-key store your-api-key your-api-secret --resource kafka-cluster-id
confluent api-key use your-api-key
your-api-key
和 your-api-secret
替换为生成的值。在 Kafka 主题选项卡中,转到 Schema 选项卡查找架构 ID(通常是数字)。
然后你可以用标准 JSON 格式编写消息,例如:
{"registertime": 1715086531577, "userid": "someid", "regionid": "UK", "gender": "FEMALE"}
要发送此消息,您只需运行以下命令:
echo '{"registertime": 1715086531577, "userid": "someid", "regionid": "UK", "gender": "FEMALE"}' | \
confluent kafka topic produce your_topic --value-format avro --schema 123456
your_topic
替换为您的 Kafka 主题名称123456
替换为您的架构 ID 如果一切正常,您将看到消息:
Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit
。
现在 Avro 格式的消息应该出现在您的 Kafka 主题中