Kafka消费者无法从Kafka主题订阅(通过Spark流运行)

问题描述 投票:0回答:2

创建道具对象后设置使用者的代码

val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))

代码已导入,如下所示

package main.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import java.io.IOException

我通过sbt创建了一个装配罐

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0-kafka-2.1.1"

我在这里想念什么?

错误消息:

用户类抛出异常:java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava / util / Collection;)V

apache-kafka spark-streaming kafka-consumer-api sbt-assembly
2个回答
0
投票

我在Spark 2.2.0和kafka 0.10.0中遇到了相同的问题问题是由于spark2-submit(也包括spark2-shell)中的默认Kafka默认版本]

我找到了决定here

1. Before spark2-submit you have to export kafka version
$ export SPARK_KAFKA_VERSION=0.10
$ spark2-submit ...

-2
投票

[subscribe接受类型为java.util.Collections的输入,而不是java.util.Arrays.asList

尝试

consumer.subscribe(java.util.Arrays.asList("topic"))

它应该工作...

© www.soinside.com 2019 - 2024. All rights reserved.