说我有一个数据集:
Dataset<Row> sqlDF = this.spark.sql("SELECT first_name, last_name, age from persons";
这将返回包含三列的Dataset
:名字,姓氏,年龄。
我想应用一个将age
列加5并返回与原始数据集相同的列,但更改了年龄值的新数据集的函数:
public int add_age(int old_age){
return old_age + 5;
}
如何使用Java上的Apache Spark进行此操作?
我通过构造一个StructType并添加了三列,然后将它们映射到新构造的行,并使用age
将函数应用于行列RowFactory
,解决了此问题:
StructType customStructType = new StructType();
customStructType = customStructType.add("first_name", DataTypes.StringType, true);
customStructType = customStructType.add("last_name", DataTypes.StringType, true);
customStructType = customStructType.add("age", DataTypes.IntegerType, true);
ExpressionEncoder<Row> customTypeEncoder = null;
Dataset<Row> changed_data = sqlDF.map(row->{
return RowFactory.create(row.get(0),row.get(1), add_age(row.get(2)));
}, RowEncoder.apply(customStructType));