我已经检查了很多其他论坛和帖子,但我似乎无法缩小问题的范围。我一直看到的都是人们说不要使用日志记录,以及它是如何被废弃的,但我甚至不知道我的代码中哪里使用了它。
当我运行下面的代码时,
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import twitter4j.Status
object TrendingHashTags {
def main(args: Array[String]): Unit = {
if (args.length < 8) {
System.err.println("Usage: TrendingHashTags <consumer key> <consumer secret> " +
"<access token> <access token secret> " +
"<language> <batch interval> <min-threshold> <show-count> " +
"[<filters>]")
System.exit(1)
}
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret,
lang, batchInterval, minThreshold, showCount ) = args.take(8)
val filters = args.takeRight(args.length - 8)
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val conf = new SparkConf().setMaster(("local[4]")).setAppName("TrendingHashTags")
val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
ssc.checkpoint("checkpoint")
val tweets = TwitterUtils.createStream(ssc, None, filters)
val tweetsFilteredByLang = tweets.filter{tweet => tweet.getLang() == lang}
val statuses = tweetsFilteredByLang.map{tweet => tweet.getText()}
val words = statuses.flatMap{status => status.split("""\s+""")}
val hashTags = words.filter{word => word.startsWith("#")}
val hashTagPairs = hashTags.map{hashtag => (hashtag, 1)}
val tagsWithCounts = hashTagPairs.updateStateByKey(
(counts: Seq[Int], prevCount: Option[Int]) =>
prevCount.map{c => c + counts.sum}.orElse{Some(counts.sum)}
)
val topHashTags = tagsWithCounts.filter { case (t, c) =>
c > minThreshold.toInt
}
val sortedTopHashTags = topHashTags.transform{rdd =>
rdd.sortBy({case(w, c) => c}, false)
}
sortedTopHashTags.print(showCount.toInt)
ssc.start()
ssc.awaitTermination()
}
}
我得到了以下错误的堆栈跟踪。
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
at TrendingHashTags$.main(TrendingHashTags.scala:28)
at TrendingHashTags.main(TrendingHashTags.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
这是我的build.sbt内容。
name := "sparkStreaming"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "2.4.5",
"org.apache.spark" %% "spark-sql" % "2.4.5",
"org.apache.spark" %% "spark-streaming" % "2.4.5" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.6.3")
明显的迹象是,在内部的某些地方,你正在使用较低版本的火花......(火花1.5可能是)。(火花1.5可能是)
sbt inspect tree clean
你可以用这个来检查,对于maven用户来说。mvn depdency:tree
将给出所有使用的依赖关系列表
还有一点就是你用的是
"org.apache.spark" %% "spark-streaming" % "2.4.5" % "provided",
改为 default
maven范围 compile
和看。
类似问题及答案 此处