我有一个函数,它会抛出大量数据(数十亿行)并返回元组
数据集[(Seq [Data1],Seq [Data2],Seq [Data3])]
此结果数据集包含更多行(与输入相比)
val result: Dataset[ (Seq[Data1], Seq[Data2], Seq[Data3]) ] = process(file, spark)
Seq[Data1] = billions rows
Seq[Data2] = millions rows
Seq[Data3] = millions rows (less than Data2)
现在我需要将这3个序列写入单独的avro文件中。
如何有效地做到这一点?
目前我使用flatmap函数来分隔这些Seqs:
result.flatMap( row => row._1).write.mode(SaveMode.Append).avro(path1) //Data1
result.flatMap( row => row._2).write.mode(SaveMode.Append).avro(path2) //Data2
result.flatMap( row => row._3).write.mode(SaveMode.Append).avro(path3) //Data3
从处理时间开始,我看到flatMap和写avro文件的所有3个调用都花了相同的时间。因此看起来过程函数被称为3次(每个flatmap)
如何只调用一次过程函数然后才过滤结果?
是否可以使用缓存? (数据集包含数十亿行)
result.cache()
你建议做什么?
注意:我使用Spark 2.2版,scala 2.11.8
您可以尝试缓存结果,如果有足够的内存,结果将被缓存,否则缓存将失败,结果将重新计算。