鉴于这个例子;
val someRDD = firstRDD.flatMap{ case(x,y) => SomeFunc(y)}
val oneRDD = someRDD.reduceByKey(_+_)
oneRDD.saveAsNewAPIHadoopFile("dir/to/write/to", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text, Text]])
哪个更好?
val someRDD = firstRDD.flatMap{ case(x,y) => SomeFunc(y)}.persist(storage.StorageLevel.MEMORY_AND_DISK_SER)
val oneRDD = someRDD.reduceByKey(_+_)
oneRDD.saveAsNewAPIHadoopFile("dir/to/write/to", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text, Text]])
要么
val someRDD = firstRDD.flatMap{ case(x,y) => SomeFunc(y)}.persist(storage.StorageLevel.MEMORY_AND_DISK_SER)
val oneRDD = someRDD.reduceByKey(_+_).persist(storage.StorageLevel.MEMORY_AND_DISK_SER)
oneRDD.saveAsNewAPIHadoopFile("dir/to/write/to", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text, Text]])
或者是其他东西?
我发现当你在同一个RDD上执行多个动作时,坚持下去是很好的。
例如;
val newRDD = context.parallelize(0 until numMappers, numPartitions).persist(storage.StorageLevel.MEMORY_AND_DISK_SER) #persisted bc there are two follow on actions preformed on it.
newRDD.count() #same RDD
newRDD.saveAsNewAPIHadoopFile() #same RDD
...other actions etc.
这里只有一个RDD和两个行动。我应该坚持所有。
来自Spark文档:
即使没有用户调用
reduceByKey
,Spark也会在shuffle操作(例如persist
)中自动保留一些中间数据。这样做是为了避免在shuffle期间节点发生故障时重新计算整个输入。我们仍然建议用户在生成的RDD上调用persist,如果他们计划重用它。
(我在上面的陈述中添加了粗体)
请注意,链接转换很好。重用RDD时会出现性能问题