我陷入了以下问题,我能够从Kafka主题中提取数据触发流,但是当我将RDD流加入Dataset [String]并将结果数据(经过一些处理)转储到另一个Kafka主题时,抛出错误。
下一行错误:
tRDD.take(20)
val temp: RDD[String] = tRDD.rdd
SBT文件:
scalaVersion := "2.11.8"
val sparkVersion = "2.3.1"
libraryDependencies ++= Seq(
"org.apache.hadoop"% "hadoop-client" % "2.7.0",
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" % "spark-streaming_2.11" % sparkVersion,
"org.apache.spark" % "spark-sql_2.11" % sparkVersion,
"org.apache.kafka" % "kafka-clients" % "0.10.0.1",
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided"
)
Kafka / Spark流代码:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.clustering.KMeansModel
import org.apache.spark.rdd.RDD
import org.apache.spark.ml._
import org.apache.kafka.clients.producer._
import java.util.Properties
object SparkKafkaConsumerProducer extends App {
val brokers = "serverip:9092"
val groupId = "sparkApplication"
val batchInterval = "2"
val pollTimeout = "1000"
val sourceTopics = "UberDataTopics"
val sourceTopicsSet = sourceTopics.split(",").toSet
val kafka_topic:String = "UberDataResultDataTopics" //publish topics
val modelpath="hdfs://serverip:9000/Machine-Learning-models/KmeansModel"
val sparkConf = new SparkConf().setAppName("ClusterUberStream")
val spark = SparkSession.builder().master("local[2]").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,
Seconds(batchInterval.toInt))
System.setProperty("hadoop.home.dir", "C:\\winutils")
import spark.implicits._
val schema = StructType(Array(
StructField("dt", TimestampType, true),
StructField("lat", DoubleType, true),
StructField("lon", DoubleType, true),
StructField("cluster",IntegerType, true),
StructField("base", StringType, true)
))
/***********************************************************/
System.out.println("Subscribed to : " + sourceTopics)
val kafkaConsumerParams = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG-> "true",
"spark.kafka.poll.time" -> pollTimeout,
"spark.streaming.kafka.consumer.poll.ms" -> "8192"
)
// load model for getting clusters
val model = KMeansModel.load(modelpath)
model.clusterCenters.foreach(println)
var ac = new Array[Center](20)
System.out.println("model loaded" )
// create a dataframe with cluster centers to join with stream
var index: Int = 0
model.clusterCenters.foreach(x => {
ac(index) = Center(index, x(0), x(1))
index += 1
})
/* val cc: RDD[Center] = spark.sparkContext.parallelize(ac)
val ccdf = cc.toDF() */
val cc: RDD[Center] = spark.sparkContext.parallelize(ac)
val ccdf = cc.toDF()
val ccds=ccdf.as[Uber]
val consumerStrategy = ConsumerStrategies.Subscribe[String, String]
(sourceTopicsSet, kafkaConsumerParams)
val messagesDStream = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, consumerStrategy
)
// get message values from key,value
val valuesDStream: DStream[String] = messagesDStream.map(_.value())
//process stream
valuesDStream.foreachRDD { rdd =>
if (!rdd.isEmpty) {
val count = rdd.count
println("count received " + count)
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
var uber: Uber = new Uber("1", 22, 33, "3") //check needed
val df = rdd.map(uber.parseUber).toDF()
println("uber data")
df.show()
// get features to pass to model
val featureCols = Array("lat", "lon")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val df2 = assembler.transform(df)
// get cluster categories from model
val categories = model.transform(df2)
categories.createOrReplaceTempView("uber")
val clust = categories.select($"dt", $"lat", $"lon", $"base", $"prediction".alias("cid")).orderBy($"dt")
val sqlDf=spark.sql("select dt,lat,lon,base,prediction.alias(cid) from uber")
val res = clust.join(ccds, Seq("cid")).orderBy($"dt")
val tRDD: org.apache.spark.sql.Dataset[String] = res.toJSON
tRDD.take(20)
val temp: RDD[String] = tRDD.rdd
val props = new Properties()
props.put("bootstrap.servers", "serverip:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
temp.foreachPartition { eachPartition => {
val kProducer = new KafkaProducer[String, String](props)
eachPartition.toList.foreach { eachElement => {
val kMessage = new ProducerRecord[String, String](kafka_topic, null, eachElement)
kProducer.send(kMessage)
}}}}
}
}
// Start the computation
println("start streaming")
ssc.remember(Minutes(1))
ssc.start()
ssc.awaitTerminationOrTimeout(1 * 5 * 1000)
}
错误堆栈跟踪:
INFO SparkContext: Starting job: show at DataStream.scala:128
INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 25.6 KB, free 366.1 MB)
INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 10.4 KB, free 366.1 MB)
INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on serverip:43538 (size: 10.
4 KB, free: 366.2 MB)
INFO SparkContext: Created broadcast 24 from broadcast at DAGScheduler.scala:1039
INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 23 (MapPartitionsRDD[68] at show at uberDataStream.scala:128) (first 15 tasks are
for partitions Vector(0))
INFO TaskSchedulerImpl: Adding task set 23.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 23.0 (TID 23, localhost,
executor driver, partition 0, PROCESS_LOCAL, 774
0 bytes)
INFO Executor: Running task 0.0 in stage 23.0 (TID 23)
INFO KafkaRDD: Computing topic UberDataTopics, partition 0 offsets 529 -> 532
INFO CachedKafkaConsumer: Initial fetch for spark-executor-sparkApplication
UberDataTopics 0 529
ERROR ContextCleaner: Error in cleaning thread
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
at
您需要对构建文件进行以下更改