[sbt项目使用kafka进行流式传输

问题描述 投票:1回答:1

我陷入了以下问题,我能够从Kafka主题中提取数据触发流,但是当我将RDD流加入Dataset [String]并将结果数据(经过一些处理)转储到另一个Kafka主题时,抛出错误。

下一行错误:

  tRDD.take(20)
  val temp: RDD[String] = tRDD.rdd

SBT文件:

scalaVersion := "2.11.8"  
val sparkVersion = "2.3.1" 
libraryDependencies ++= Seq(
"org.apache.hadoop"% "hadoop-client" % "2.7.0",
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" % "spark-streaming_2.11" % sparkVersion,
"org.apache.spark" % "spark-sql_2.11" % sparkVersion,
"org.apache.kafka" % "kafka-clients" % "0.10.0.1",
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided"
 )

Kafka / Spark流代码:

 import org.apache.kafka.clients.consumer.ConsumerConfig

 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.kafka010._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka010.ConsumerStrategies
 import org.apache.spark.sql._
 import org.apache.spark.sql.types._
 import org.apache.spark.ml.feature.VectorAssembler
 import org.apache.spark.ml.clustering.KMeansModel
 import org.apache.spark.rdd.RDD
 import org.apache.spark.ml._
 import org.apache.kafka.clients.producer._
 import java.util.Properties

 object SparkKafkaConsumerProducer  extends App {

 val brokers = "serverip:9092"
 val groupId = "sparkApplication"
 val batchInterval = "2"
 val pollTimeout = "1000"
 val sourceTopics = "UberDataTopics"
 val sourceTopicsSet = sourceTopics.split(",").toSet
 val kafka_topic:String = "UberDataResultDataTopics" //publish topics
 val modelpath="hdfs://serverip:9000/Machine-Learning-models/KmeansModel"

 val sparkConf = new SparkConf().setAppName("ClusterUberStream")
 val spark = SparkSession.builder().master("local[2]").getOrCreate()
 val ssc = new StreamingContext(spark.sparkContext, 
 Seconds(batchInterval.toInt))

 System.setProperty("hadoop.home.dir", "C:\\winutils")

 import spark.implicits._
 val schema = StructType(Array(
 StructField("dt", TimestampType, true),
 StructField("lat", DoubleType, true),
 StructField("lon", DoubleType, true),
 StructField("cluster",IntegerType, true),
 StructField("base", StringType, true)
 ))
 /***********************************************************/

 System.out.println("Subscribed to : " + sourceTopics)

 val kafkaConsumerParams = Map[String, String](
 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
 ConsumerConfig.GROUP_ID_CONFIG -> groupId,
 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
 "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG-> "true",
"spark.kafka.poll.time" -> pollTimeout,
"spark.streaming.kafka.consumer.poll.ms" -> "8192"
 )

 // load model for getting clusters
val model = KMeansModel.load(modelpath)
model.clusterCenters.foreach(println)
   var ac = new Array[Center](20)
System.out.println("model loaded" )

//  create a dataframe with cluster centers to join with stream
var index: Int = 0
model.clusterCenters.foreach(x => {
 ac(index) = Center(index, x(0), x(1))
index += 1
})
/* val cc: RDD[Center] = spark.sparkContext.parallelize(ac)
val ccdf = cc.toDF() */
val cc: RDD[Center] = spark.sparkContext.parallelize(ac)
val ccdf = cc.toDF()
val ccds=ccdf.as[Uber]

val consumerStrategy = ConsumerStrategies.Subscribe[String, String] 
(sourceTopicsSet, kafkaConsumerParams)

val messagesDStream = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, consumerStrategy
 )
// get message values from key,value
val valuesDStream: DStream[String] = messagesDStream.map(_.value())
//process stream
valuesDStream.foreachRDD { rdd =>

if (!rdd.isEmpty) {
  val count = rdd.count
  println("count received " + count)
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  var uber: Uber = new Uber("1", 22, 33, "3") //check needed

  val df = rdd.map(uber.parseUber).toDF()
  println("uber data")
  df.show()
  // get features to pass to model
  val featureCols = Array("lat", "lon")
  val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
  val df2 = assembler.transform(df)

  // get cluster categories from  model
  val categories = model.transform(df2)
  categories.createOrReplaceTempView("uber")

  val clust = categories.select($"dt", $"lat", $"lon", $"base", $"prediction".alias("cid")).orderBy($"dt")

  val sqlDf=spark.sql("select dt,lat,lon,base,prediction.alias(cid) from uber")

  val res = clust.join(ccds, Seq("cid")).orderBy($"dt")
  val tRDD: org.apache.spark.sql.Dataset[String] = res.toJSON
  tRDD.take(20)
  val temp: RDD[String] = tRDD.rdd

  val props = new Properties()
  props.put("bootstrap.servers", "serverip:9092")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

  temp.foreachPartition { eachPartition => {
    val kProducer = new KafkaProducer[String, String](props)
    eachPartition.toList.foreach { eachElement => {
      val kMessage = new ProducerRecord[String, String](kafka_topic, null, eachElement)
      kProducer.send(kMessage)
      }}}}
}
}

// Start the computation
println("start streaming")

ssc.remember(Minutes(1))
ssc.start()
ssc.awaitTerminationOrTimeout(1 * 5 * 1000)

}

错误堆栈跟踪:

INFO SparkContext: Starting job: show at DataStream.scala:128
INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 25.6 KB, free 366.1 MB)
INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 10.4 KB, free 366.1 MB)
INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on serverip:43538 (size: 10.
4 KB, free: 366.2 MB)
INFO SparkContext: Created broadcast 24 from broadcast at DAGScheduler.scala:1039
INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 23 (MapPartitionsRDD[68] at show at uberDataStream.scala:128) (first 15 tasks are 
for partitions Vector(0))
INFO TaskSchedulerImpl: Adding task set 23.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 23.0 (TID 23, localhost, 
executor driver, partition 0, PROCESS_LOCAL, 774
0 bytes)
INFO Executor: Running task 0.0 in stage 23.0 (TID 23)
INFO KafkaRDD: Computing topic UberDataTopics, partition 0 offsets 529 -> 532
INFO CachedKafkaConsumer: Initial fetch for spark-executor-sparkApplication 
UberDataTopics 0 529
ERROR ContextCleaner: Error in cleaning thread
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    at 
scala apache-kafka sbt spark-streaming apache-spark-dataset
1个回答
0
投票

您需要对构建文件进行以下更改

© www.soinside.com 2019 - 2024. All rights reserved.