我需要将Cassandra分区写为实木复合地板文件。由于我无法在foreach函数中共享和使用sparkSession。首先,我调用collect方法来收集驱动程序中的所有数据,然后将镶木地板文件写入HDFS,如下所示。
感谢此链接https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md
我能够获取分区行。每当从cassandra表中读取分区时,我都希望将分区行写入单独的实木复合地板文件中。我还尝试了sparkSQLContext,该方法将任务结果写为临时结果。我认为,在完成所有任务之后。我将看到镶木地板文件。
有什么方便的方法吗?
val keyedTable : CassandraTableScanRDD[(Tuple2[Int, Date], MyCassandraTable)] = getTableAsKeyed()
keyedTable.groupByKey
.collect
.foreach(f => {
import sparkSession.implicits._
val items = f._2.toList
val key = f._1
val baseHDFS = "hdfs://mycluster/parquet_test/"
val ds = sparkSession.sqlContext.createDataset(items)
ds.write
.option("compression", "gzip")
.parquet(baseHDFS + key._1 + "/" + key._2)
})
为什么不到处使用Spark SQL并使用Parquet的内置功能来按分区写入数据,而不是自己创建目录层次结构?
类似这样的东西:
import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("table", "keyspace").load()
data.write
.option("compression", "gzip")
.partitionBy("col1", "col2")
.parquet(baseHDFS)
在这种情况下,它将为col
和col2
的每个值创建一个单独的目录,作为嵌套目录,其名称如下:${column}=${value}
。然后,当您阅读时,您可能会强制只阅读特定值。