计算我的RDD在大型Dstream中的记录

问题描述 投票:2回答:2

我正在尝试使用文件DStream读取的大型RDD。

代码如下:

val creatingFunc = { () =>
  val conf = new SparkConf()
              .setMaster("local[10]")
              .setAppName("FileStreaming")
              .set("spark.streaming.fileStream.minRememberDuration", "2000000h")
              .registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text], classOf[GGSN]))

  val sc = new SparkContext(conf)

  // Create a StreamingContext
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))

  val appFile = httpFileLines
                  .map(x=> (x._1,x._2.toString()))
                  .filter(!_._2.contains("ggsnIPAddress"))
                  .map(x=>(x._1,x._2.split(",")))

  var count=0

  appFile.foreachRDD(s => {
    // s.collect() throw exception due to insufficient amount of emery
    //s.count() throw exception due to insufficient amount of memory
  s.foreach(x => count = count + 1)
  })

  println(count)
  newContextCreated = true
  ssc
}

我想要做的是得到我的RDD的计数...因为它很大..引发异常..所以我需要做一个foreach而不是收集数据到内存..

我想得到计数然后作为我的代码中的方式,但它总是给0 ..

有没有办法做到这一点?

scala apache-spark spark-streaming
2个回答
0
投票

没有必要foreachRDD并致电count。您可以使用count上定义的DStream方法:

val appFile = httpFileLines
                .map(x => (x._1, x._2.toString()))
                .filter(!_._2.contains("ggsnIPAddress"))
                .map(x => (x._1, x._2.split(",")))

val count = appFile.count()

如果仍然产生不足的内存异常,则需要每次都计算较小批量的数据,或者扩大工作节点以处理负载。


0
投票

关于您的解决方案,您应该避免收集并总结DStream的每个RDD的计数。

var count=0
appFile.foreachRDD { rdd => {
    count = count + rdd.count()
    }
}

但我发现这个解决方案非常难看(在scala中使用var)。

我更喜欢以下解决方案:

val count: Long = errorDStream.count().reduce(_+_)

请注意,count方法返回DStream为Long而不是Long,这就是为什么你需要使用reduce

© www.soinside.com 2019 - 2024. All rights reserved.