下面这段代码是我使用Spark Streaming的Twitter流媒体应用程序的一部分。
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
每当我运行这个程序时,我都会得到以下错误信息:
Exception in thread "main" scala.MatchError: [Ljava.lang.String;@323659f8 (of class [Ljava.lang.String;)
at SparkPopularHashTags$.main(SparkPopularHashTags.scala:18)
at SparkPopularHashTags.main(SparkPopularHashTags.scala)
第18行是:
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
我把Twitter4j.properties文件保存在我的F:\Software\ItelliJ\Projects\twitterStreamApp\src文件夹里,它的格式是这样的。
oauth.consumerKey=***
oauth.consumerSecret=***
oauth.accessToken=***
oauth.accessTokenSecret=***
其中 "*"是我的密钥,周围没有引号(例如:oauth.consumerKey=h12b31289fh7139fbh138ry)
谁能帮我解决这个问题?
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
object SparkPopularHashTags {
val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
val sc = new SparkContext(conf)
def main(args: Array[String]) {
sc.setLogLevel("WARN")
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
// val filters = args.takeRight(args.length - 4)
args.lift(0).foreach { consumerKey =>
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
}
args.lift(1).foreach { consumerSecret =>
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
}
args.lift(2).foreach { accessToken =>
System.setProperty("twitter4j.oauth.accessToken", accessToken)
}
args.lift(3).foreach { accessTokenSecret =>
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
}
val filters = args.drop(4)
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generate OAuth credentials
// System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
// System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
// System.setProperty("twitter4j.oauth.accessToken", accessToken)
// System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
// Set the Spark StreamingContext to create a DStream for every 5 seconds
val ssc = new StreamingContext(sc, Seconds(5))
val stream = TwitterUtils.createStream(ssc, None, filters)
// Split the stream on space and extract hashtags
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
// Get the top hashtags over the previous 60 sec window
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// Get the top hashtags over the previous 10 sec window
val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// print tweets in the correct DStream
stream.print()
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
topCounts10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
ssc.start()
ssc.awaitTermination()
}
}
这就是问题所在。
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
如果参数少于4个,这将会失败 因为它不能匹配左手边的四个值。
相反,你需要测试 args
以确保它们的存在。例如
args.lift(0).foreach { consumerKey =>
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
}
args.lift(1).foreach { consumerSecret =>
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
}
args.lift(2).foreach { accessToken =>
System.setProperty("twitter4j.oauth.accessToken", accessToken)
}
args.lift(3).foreach { accessTokenSecret =>
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
}
val filters = args.drop(4)
只有当你没有设置你的程序参数或设置的参数数量不足时,才会发生这种情况,即少于4个参数。