我使用SparkML训练了几毫升的管道并将它们保存在HDFS中。现在,我想将管道应用于同一数据帧。我实现了一个通用评分类,它读入管道和数据,将每个管道应用到数据框,并将模型预测作为新列附加。这是我的Java代码示例:
List<PipelineModel> models = readPipelineModels(...)
Dataset<Row> originalDf = spark.read().parquet(...)
Dataset<Row> mergedDf = originalDf;
for (PipelineModel pipelineModel : models) {
Dataset<Row> applyDf = pipelineModel.transform(originalDf);
applyDf = dropDuplicateColumns(applyDf, mergedDf); // drops columns in applyDf which are present in mergedDf
mergedDf = mergedDf.withColumn("rowId", monotonically_increasing_id());
applyDf = applyDf.withColumn("rowId", monotonically_increasing_id());
mergedDf = mergedDf.join(applyDf, "rowId").drop("rowId").cache();
}
我注意到一些性能问题,特别是对于大型数据集。用于绑定数据帧的连接非常昂贵,并且在各阶段之间进行了大量的改组。
请注意,我将每个模型应用于originalDf而不是mergedDf。如果我在每次迭代中将模型应用于mergedDf,我会收到一条错误,指出前一次迭代中“列xy已存在”。
您有什么建议来改善这份工作的表现吗?
几点说明:
monotonically_increasing_id
。不能保证两者都会增加相同的值。数据集1可以获得1,2,3,数据集2可以获得1000,2005,3999就像是:
List<PipelineModel> models = readPipelineModels(...);
Dataset<Row> mergedDf = spark.read().parquet(...);
int i = 0;
for (PipelineModel pipelineModel : models) {
i += 1;
mergedDf = pipelineModel.transform(mergedDf);
mergedDf = mergedDf.withColumnRenamed("yourModelOutput", "model_outputs_" + i);
}
FWIW我已经习惯了PySpark并且在我的脑海里翻译 - 但这就是你如何解决它的要点。