将多个SparkML管道应用于单个DataFrame

问题描述 投票:1回答:1

我使用SparkML训练了几毫升的管道并将它们保存在HDFS中。现在,我想将管道应用于同一数据帧。我实现了一个通用评分类,它读入管道和数据,将每个管道应用到数据框,并将模型预测作为新列附加。这是我的Java代码示例:

    List<PipelineModel> models = readPipelineModels(...)
    Dataset<Row> originalDf = spark.read().parquet(...)
    Dataset<Row> mergedDf = originalDf;
    for (PipelineModel pipelineModel : models) {
            Dataset<Row> applyDf = pipelineModel.transform(originalDf);
            applyDf = dropDuplicateColumns(applyDf, mergedDf); // drops columns in applyDf which are present in mergedDf
            mergedDf = mergedDf.withColumn("rowId", monotonically_increasing_id());
            applyDf = applyDf.withColumn("rowId", monotonically_increasing_id());
            mergedDf = mergedDf.join(applyDf, "rowId").drop("rowId").cache();
    }

我注意到一些性能问题,特别是对于大型数据集。用于绑定数据帧的连接非常昂贵,并且在各阶段之间进行了大量的改组。

请注意,我将每个模型应用于originalDf而不是mergedDf。如果我在每次迭代中将模型应用于mergedDf,我会收到一条错误,指出前一次迭代中“列xy已存在”。

您有什么建议来改善这份工作的表现吗?

apache-spark machine-learning apache-spark-sql apache-spark-ml
1个回答
0
投票

几点说明:

  • 我不会分别制作两列monotonically_increasing_id。不能保证两者都会增加相同的值。数据集1可以获得1,2,3,数据集2可以获得1000,2005,3999
  • 我认为您不需要删除/合并DF。我会反复使用应用模型的结果。

就像是:

List<PipelineModel> models = readPipelineModels(...);
Dataset<Row> mergedDf = spark.read().parquet(...);
int i = 0;
for (PipelineModel pipelineModel : models) {
    i += 1;
    mergedDf = pipelineModel.transform(mergedDf);
    mergedDf = mergedDf.withColumnRenamed("yourModelOutput", "model_outputs_" + i);
}

FWIW我已经习惯了PySpark并且在我的脑海里翻译 - 但这就是你如何解决它的要点。

© www.soinside.com 2019 - 2024. All rights reserved.