如果我有LOCALLY执行的函数,则在foreachRdd中使用kafka代码进行Spart流式传输不会执行

问题描述 投票:0回答:1

我已在本地设置spark 2.2并使用scala

spark会话配置如下

val sparkSession = SparkSession
  .builder()
  .appName("My application")
  .config("es.nodes", "localhost:9200")
  .config("es.index.auto.create", true)
  .config("spark.streaming.backpressure.initialRate", "1")
  .config("spark.streaming.kafka.maxRatePerPartition", "7")
  .master("local[2]")
  .enableHiveSupport()
  .getOrCreate()

我在我的本地机器上运行火花

当我做

  kafkaStream.foreachRDD(rdd => {
   calledFunction(rdd)
 })


def calledFunction(rdd: RDD[ConsumerRecord[String, String]]): Unit ={

 rdd.foreach(r=>{
 print("hello")})
}

对于我本地机器上面的代码“hello”不打印,但所有作业都排成一行。

如果我改变我的代码

kafkaStream.foreachRDD(rdd => { rdd.foreach(r=>{ print("hello")}) })

然后它在控制台上打印“你好”。

你能帮我解决一下这个问题吗?

scala apache-spark spark-streaming
1个回答
0
投票

当使用spark 1.6运行时,它在控制台中打印你好。这里参考的是示例代码

val message = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
  ssc,
  kafkaConf,
  Map("test" ->1),
  StorageLevel.MEMORY_ONLY
)
val lines = message.map(_._2)
lines.foreachRDD(rdd => {calledFunction(rdd)})


def calledFunction(rdd: RDD[String]): Unit ={
  rdd.foreach(r=>{
    print("hello")})
}

希望这可以帮助。由于依赖性不匹配,我暂时无法使用spark 2.0重新生成相同的问题。

© www.soinside.com 2019 - 2024. All rights reserved.