我正在使用spark从主题kafka获取数据。我必须使用KafkaAvroDeserialaizer对avro数据进行deserialaizer。我配置kafka消费者:
kafkaParams.put("bootstrap.servers", "10.0.4.215:9092");
kafkaParams.put("key.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
kafkaParams.put("value.deserializer",io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
// kafkaParams.put("key.convert", com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.class);
//kafkaParams.put("value.convert",com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
但是当我执行代码时,我在线程中有异常Exception
“streaming-start”java.lang.NoClassDefFoundError:io / confluent / common / config / ConfigException
谁能告诉我在哪里可以找到这个类def?例如maven依赖ext。
您需要以下依赖项:group:'io.confluent',name:'common-config',version:yourConfluentVersion
我有同样的问题。我使用了5.1.0
汇合平台版本。我检查了compatibility kafka <-> confluent
,发现有相同级别的兼容性的更新版本。我将版本更新到5.1.1
,它为我解决了这个问题。