Apache Flink:从Kafka读取数据作为字节数组

问题描述 投票:1回答:2

如何以byte[]格式读取Kafka的数据?

我有一个实现,用String读取事件为SimpleStringSchema(),但我找不到一个架构来读取数据为byte[]

这是我的代码:

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "kafka1:9092");
    properties.setProperty("zookeeper.connect", "zookeeper1:2181");
    properties.setProperty("group.id", "test");
    properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    properties.setProperty("auto.offset.reset", "earliest");
    DataStream<byte[]> stream = env
                .addSource(new FlinkKafkaConsumer010<byte[]>("testStr", ? ,properties));
deserialization apache-flink kafka-consumer-api flink-streaming
2个回答
1
投票

最后我发现:

DataStream<byte[]> stream = env
            .addSource(new FlinkKafkaConsumer010<>("testStr", new AbstractDeserializationSchema<byte[]>() {
                @Override
                public byte[] deserialize(byte[] bytes) throws IOException {
                    return bytes;
                }
            }, properties));

-1
投票

对于scala,您应该写如下

new AbstractDeserializationSchema[Array[Byte]](){
     override def deserialize(bytes: Array[Byte]): Array[Byte] = bytes
}
© www.soinside.com 2019 - 2024. All rights reserved.