通过以下代码,如何将JavaRDD 转换为DataFrame或DataSet

问题描述 投票:0回答:1
public static void main(String[] args) {
        SparkSession sessn = SparkSession.builder().appName("RDD2DF").master("local").getOrCreate();
        List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
        Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
        System.out.println(DF.javaRDD().getNumPartitions());
        JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

}

根据以上代码,我无法将JavaRdd(mappartRdd)转换为Java Spark中的DataFrame。我正在使用下面的方法将JavaRdd转换为DataFrame / DataSet。

sessn.createDataFrame(mappartRdd, beanClass);

我为createDataFrame尝试了多个选项和不同的重载函数。我面临将其转换为DF的问题。我需要提供什么beanclass才能使代码正常工作?

与scala不同,没有像toDF()这样的函数可以将RDD转换为Java中的DataFrame。有人可以根据我的要求协助将其转换。

注意:我可以通过如下修改上述代码直接创建数据集。

Dataset<Integer> mappartDS = DF.repartition(3).mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT());

但是我想知道为什么如果我使用createDataFrame时,我的JavaRdd不能转换为DF / DS。任何帮助将不胜感激。

apache-spark apache-spark-sql rdd sparkcore
1个回答
0
投票

这似乎是this SO Question的跟进

我认为,您正在学习火花。我建议理解提供的Java的api-https://spark.apache.org/docs/latest/api/java/index.html

关于您的问题,如果您检查createDataFrame api,如下所示-

 def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
...
}

如您所见,它将JavaRDD[Row]和相关的StructType模式作为args。因此,要创建等于DataFrameDataset<Row>,请使用以下代码段-

  StructType schema = new StructType()
                .add(new StructField("value", DataTypes.IntegerType, true, Metadata.empty()));
        Dataset<Row> df = spark.createDataFrame(mappartRdd.map(RowFactory::create), schema);
        df.show(false);
        df.printSchema();

        /**
         * +-----+
         * |value|
         * +-----+
         * |6    |
         * |8    |
         * |6    |
         * +-----+
         *
         * root
         *  |-- value: integer (nullable = true)
         */
© www.soinside.com 2019 - 2024. All rights reserved.