Kafka在scala中连接avro消费者

问题描述 投票:0回答:1

我有一个使用kafka connect的生产者,它使用Confluent Kafka Connect API,它以“SourceRecord”格式发布消息,其中包含“schema”和“struct”,如下所示。

我正在寻找一个示例代码来在scala中构建一个kafka使用者,它消耗该消息并将其反序列化为一个对象

import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

//publish kafka message in avro format 

    protected SourceRecord makeSourceRecord(AvroDataEvent avroDataEvent) {
        return new SourceRecord(
                partitionKey(config.sourceJdbcUrl),
                config.topicName,
                avroDataEvent.schema(),
                avroDataEvent.struct());
    }
scala apache-kafka-connect confluent
1个回答
0
投票

您可以使用Confluent KafkaAvroDeserializer类以及使用Connector配置的架构注册表直接从config.topicName主题使用

仅仅因为来自Connect的数据不需要使用Connect API来读取它。

关于示例代码,请尝试将其作为起点(在Kotlin中)http://aseigneurin.github.io/2018/08/03/kafka-tutorial-5-consuming-avro.html

© www.soinside.com 2019 - 2024. All rights reserved.