将Cassandra行分组然后使用Spark写为木地板文件

问题描述 投票:0回答:1

我需要将Cassandra分区写为实木复合地板文件。由于我无法在foreach函数中共享和使用sparkSession。首先,我调用collect方法来收集驱动程序中的所有数据,然后将镶木地板文件写入HDFS,如下所示。

感谢此链接https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md

我能够获取分区行。每当从cassandra表中读取分区时,我都希望将分区行写入单独的实木复合地板文件中。我还尝试了sparkSQLContext,该方法将任务结果写为临时结果。我认为,在完成所有任务之后。我将看到镶木地板文件。

有什么方便的方法吗?

val keyedTable : CassandraTableScanRDD[(Tuple2[Int, Date], MyCassandraTable)] = getTableAsKeyed()

keyedTable.groupByKey
      .collect
      .foreach(f => {
        import sparkSession.implicits._
        val items = f._2.toList
        val key = f._1
        val baseHDFS = "hdfs://mycluster/parquet_test/"
        val ds = sparkSession.sqlContext.createDataset(items)

    ds.write
      .option("compression", "gzip")
      .parquet(baseHDFS + key._1 + "/" + key._2)

  })
scala apache-spark spark-cassandra-connector
1个回答
0
投票

为什么不到处使用Spark SQL并使用Parquet的内置功能来按分区写入数据,而不是自己创建目录层次结构?

类似这样的东西:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("table", "keyspace").load()

data.write
      .option("compression", "gzip")
      .partitionBy("col1", "col2")
      .parquet(baseHDFS)

在这种情况下,它将为colcol2的每个值创建一个单独的目录,作为嵌套目录,其名称如下:${column}=${value}。然后,当您阅读时,您可能会强制只阅读特定值。

© www.soinside.com 2019 - 2024. All rights reserved.