使用融合功能在Apache flink中反序列化avro

问题描述 投票:0回答:2

我正在尝试对来自Apache flink上的kafka的Avro消息进行反序列化

我目前正在通过实现DeserializationSchema接口来实现,但是已经废止了,有没有更好的形式来实现这一目标?

谢谢。

这些是我的课程

public final class FlinkKafkaExample {

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // setup Kafka sink
    ConfluentAvroDeserializationSchema deserSchema = new
            ConfluentAvroDeserializationSchema("http://localhost:8081", 1000);

    Properties kafkaProps = new Properties();
    kafkaProps.setProperty("bootstrap.servers", "0.0.0.0:9092");
    kafkaProps.setProperty("zookeeper.connect", "0.0.0.0:2181");
    kafkaProps.setProperty("group.id", "org.apache.flink");
    FlinkKafkaConsumer08<String> flinkKafkaConsumer =
            new FlinkKafkaConsumer08<String>("conekta.public.codes", deserSchema, kafkaProps);

    DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);


    kafkaStream.print();
    env.execute("Flink Kafka Java Example");
}

}

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<String> {

private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;

public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int identityMapCapacity) {
    this.schemaRegistryUrl = schemaRegistryUrl;
    this.identityMapCapacity = identityMapCapacity;
}


public String deserialize(byte[] message) {
    if (kafkaAvroDecoder == null) {
        SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
        this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
    }
    return this.kafkaAvroDecoder.fromBytes(message).toString();
}


public boolean isEndOfStream(String nextElement) {
    return false;
}
public TypeInformation<String> getProducedType() {
    return BasicTypeInfo.STRING_TYPE_INFO;
}

}

apache-kafka apache-flink avro confluent-schema-registry
2个回答

-1
投票
我发现this更直接。它可以Ser / des任何Avro类型。
© www.soinside.com 2019 - 2024. All rights reserved.