使用ConfluentSchemaRegistry反序列化avro数据时出现异常?

问题描述 投票:0回答:1

我是flink和Kafka的新手。我试图使用Confluent Schema注册表反序列化avro数据。我已经在ec2机器上安装了flink和Kafka。此外,在运行代码之前已创建“test”主题。

代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2

代码执行以下操作:

1) Create a flink DataStream object using a list of user element. (User class is avro generated class)
2) Write the Datastream source to Kafka using AvroSerializationSchema.
3) Read the data from Kafka using ConfluentRegistryAvroDeserializationSchema by reading the schema from Confluent Schema registry.

运行flink可执行jar的命令:

./bin/flink run -c com.streaming.example.ConfluentSchemaRegistryExample /opt/flink-1.7.2/kafka-flink-stream-processing-assembly-0.1.jar

运行代码时出现异常:

java.io.IOException: Unknown data format. Magic number does not match
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:55)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:66)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

我用于User类的Avro架构如下:

{
  "type": "record",
  "name": "User",
  "namespace": "com.streaming.example",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "favorite_color",
      "type": [
        "string",
        "null"
      ]
    }
  ]
}

有人可以指出我在使用融合的Kafka模式注册表反序列化avro数据时缺少哪些步骤?

apache-kafka apache-flink avro confluent confluent-schema-registry
1个回答
1
投票

你如何编写Avro数据也需要使用注册表,以便依赖它的反序列化器工作。

But this is an open PR in Flink, still添加了ConfluentRegistryAvroSerializationSchema

解决方法,我相信将使用AvroDeserializationSchema,它不依赖于注册表。

如果你确实想在生产者代码中使用注册表,那么你必须在Flink之外这样做,直到PR合并为止。

© www.soinside.com 2019 - 2024. All rights reserved.