我有一个我想从中创建数据集的字符串,该字符串以\n
分隔为行,以\t
分隔为字段:8 "SOMETHING" 15236236 "2" "SOMETHING" "SOMETHHING"
因此,我将字符串除以\n
并从中创建List<String>
,然后使用JavaSparkContext实例创建JavaRDD,然后尝试使用sqlContet方法createDataset
创建数据集。这样编译就可以了,如果我在loadDataset()方法的return语句上放置一个断点,我会看到settingsDataset
数据集,它只会在代码调用第一个操作后才会中断。
我试图做到这一点的方法是:
private Dataset<Row> loadDataset(){
InputStream in;
Dataset<Row> settingsDataset = null;
try {
JavaSparkContext jsc = new JavaSparkConte xt(session.sparkContext());
in = getClass().getResourceAsStream("filename.tsv");
String settingsFileAsString = IOUtils.toString(in, Charsets.UTF_8);
List<String> settingsFileAsList = Arrays.asList(settingsFileAsString.split("\n"));
Encoder<Row> encoder = RowEncoder.apply(getSchema());
JavaRDD settingsFileAsRDD = jsc.parallelize(settingsFileAsList);
settingsDataset = session.sqlContext().createDataset(settingsFileAsRDD.rdd(), encoder).toDF();
} catch (Exception e) {
e.printStackTrace();
}
return settingsDataset;
}
private org.apache.spark.sql.types.StructType getSchema() {
return DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("f_1", DataTypes.StringType, true),
DataTypes.createStructField("f_2", DataTypes.StringType, true),
DataTypes.createStructField("f_3", DataTypes.StringType, true),
DataTypes.createStructField("f_4", DataTypes.StringType, true),
DataTypes.createStructField("f_5", DataTypes.StringType, true),
DataTypes.createStructField("f_6", DataTypes.StringType, true)
});
}
问题是无法创建DAG,代码中断,但以下异常:! java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
JavaRDD settingsFileAsRDD = jsc.parallelize(settingsFileAsList);
实际上是JavaRDD<String>
,但它应该是JavaRDD<Row>
。您应该将这些“行”除以\t
,然后使用Row
从中创建新的RowFactory.create(s.split("\t"))
。请参见下面的示例:
SparkSession spark = SparkSession.builder().master("local").getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
String settingsFileAsString = "1\t2\t3\t4\t5\t6\n7\t8\t9\t10\t11\t12";
List<String> settingsFileAsList = Arrays.asList(settingsFileAsString.split("\n"));
Encoder<Row> encoder = RowEncoder.apply(getSchema());
JavaRDD<Row> settingsFileAsRDD = jsc.parallelize(settingsFileAsList).map(s->RowFactory.create(s.split("\t")));
Dataset<Row> settingsDataset = spark.createDataset(settingsFileAsRDD.rdd(), encoder).toDF();
settingsDataset.show();
结果:
+---+---+---+---+---+---+
|f_1|f_2|f_3|f_4|f_5|f_6|
+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6|
| 7| 8| 9| 10| 11| 12|
+---+---+---+---+---+---+