Spark sql从字符串创建数据集

问题描述 投票:0回答:1

我有一个我想从中创建数据集的字符串,该字符串以\n分隔为行,以\t分隔为字段:8 "SOMETHING" 15236236 "2" "SOMETHING" "SOMETHHING"因此,我将字符串除以\n并从中创建List<String>,然后使用JavaSparkContext实例创建JavaRDD,然后尝试使用sqlContet方法createDataset创建数据集。这样编译就可以了,如果我在loadDataset()方法的return语句上放置一个断点,我会看到settingsDataset数据集,它只会在代码调用第一个操作后才会中断。

我试图做到这一点的方法是:

private Dataset<Row> loadDataset(){
    InputStream in;
    Dataset<Row> settingsDataset = null;
    try {
      JavaSparkContext jsc = new JavaSparkConte xt(session.sparkContext());
      in = getClass().getResourceAsStream("filename.tsv");
      String settingsFileAsString = IOUtils.toString(in, Charsets.UTF_8);
      List<String> settingsFileAsList = Arrays.asList(settingsFileAsString.split("\n"));
      Encoder<Row> encoder = RowEncoder.apply(getSchema());
      JavaRDD settingsFileAsRDD = jsc.parallelize(settingsFileAsList);
      settingsDataset = session.sqlContext().createDataset(settingsFileAsRDD.rdd(), encoder).toDF();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return settingsDataset;
 }

  private org.apache.spark.sql.types.StructType getSchema() {
    return DataTypes.createStructType(new StructField[]{
        DataTypes.createStructField("f_1", DataTypes.StringType, true),
        DataTypes.createStructField("f_2", DataTypes.StringType, true),
        DataTypes.createStructField("f_3", DataTypes.StringType, true),
        DataTypes.createStructField("f_4", DataTypes.StringType, true),
        DataTypes.createStructField("f_5", DataTypes.StringType, true),
        DataTypes.createStructField("f_6", DataTypes.StringType, true)
    });
  }

问题是无法创建DAG,代码中断,但以下异常:! java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row

apache-spark apache-spark-sql
1个回答
0
投票

JavaRDD settingsFileAsRDD = jsc.parallelize(settingsFileAsList);实际上是JavaRDD<String>,但它应该是JavaRDD<Row>。您应该将这些“行”除以\t,然后使用Row从中创建新的RowFactory.create(s.split("\t"))。请参见下面的示例:

SparkSession spark = SparkSession.builder().master("local").getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
String settingsFileAsString = "1\t2\t3\t4\t5\t6\n7\t8\t9\t10\t11\t12";
List<String> settingsFileAsList = Arrays.asList(settingsFileAsString.split("\n"));
Encoder<Row> encoder = RowEncoder.apply(getSchema());
JavaRDD<Row> settingsFileAsRDD = jsc.parallelize(settingsFileAsList).map(s->RowFactory.create(s.split("\t")));
Dataset<Row> settingsDataset = spark.createDataset(settingsFileAsRDD.rdd(), encoder).toDF();
settingsDataset.show();

结果:

+---+---+---+---+---+---+
|f_1|f_2|f_3|f_4|f_5|f_6|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  7|  8|  9| 10| 11| 12|
+---+---+---+---+---+---+
© www.soinside.com 2019 - 2024. All rights reserved.