如何逐步加载,适应新数据,使用spark保存管道模型?

问题描述 投票:0回答:1

任何指针都可以逐步训练和构建模型,并获得单个元素的预测。

尝试运行Web应用程序将在共享路径中将数据写入csv,ml应用程序将读取数据并加载模型,尝试拟合数据并保存模型,转换测试数据。 (这应该在循环中发生)

但是当第二次加载保存的模型时,面临以下异常,(我使用minmax缩放器来规范化数据)

线程“main”中的异常java.lang.IllegalArgumentException:输出列features_intermediate已存在。

任何指针都将非常感谢,谢谢

object RunAppPooling {

  def main(args: Array[String]): Unit = { // start the spark session
        val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
      .set("spark.broadcast.compress", "false")
      .setAppName("local-spark")

    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()


    val filePath = "src/main/resources/train.csv"
    val modelPath = "file:///home/vagrant/custom.model"

    val schema = StructType(
      Array(
        StructField("IDLE_COUNT", IntegerType),
        StructField("TIMEOUTS", IntegerType),
        StructField("ACTIVE_COUNT", IntegerType),
        StructField("FACTOR_LOAD", DoubleType)))
   while(true){
    // read the raw data
    val df_raw = spark
      .read
      .option("header", "true")
      .schema(schema)
      .csv(filePath)

    df_raw.show()
    println(df_raw.count())
    // fill all na values with 0
    val df = df_raw.na.fill(0)
    df.printSchema()

    // create the feature vector
    val vectorAssembler = new VectorAssembler()
      .setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" ))
      .setOutputCol("features_intermediate")

    var lr1: PipelineModel = null
    try {
      lr1 = PipelineModel.load(modelPath)
    } catch {
      case ie: InvalidInputException => println(ie.getMessage)
    }

    import org.apache.spark.ml.feature.StandardScaler
    val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")

    var pipeline: Pipeline = null
    if (lr1 == null) {
      val lr =
        new LinearRegression()
          .setMaxIter(100)
          .setRegParam(0.1)
          .setElasticNetParam(0.8)
          .setLabelCol("FACTOR_LOAD") // setting label column
      // create the pipeline with the steps
      pipeline = new Pipeline().setStages(Array( vectorAssembler, scaler, lr))
    } else {
      pipeline = new Pipeline().setStages(Array(vectorAssembler, scaler, lr1))
    }

    // create the model following the pipeline steps
    val cvModel = pipeline.fit(df) 

    // save the model
    cvModel.write.overwrite.save(modelPath)

    var testschema = StructType(
      Array(
        StructField("PACKAGE_KEY", StringType),
       StructField("IDLE_COUNT", IntegerType),
        StructField("TIMEOUTS", IntegerType),
        StructField("ACTIVE_COUNT", IntegerType)
      ))

    val df_raw1 = spark
      .read
      .option("header", "true")
      .schema(testschema)
      .csv("src/main/resources/test_pooling.csv")

    // fill all na values with 0
    val df1 = df_raw1.na.fill(0)
    val extracted = cvModel.transform(df1) //.toDF("prediction")
    import org.apache.spark.sql.functions._
    val test = extracted.select(mean(df("FACTOR_LOAD"))).collect()
    println(test.apply(0))
}
  }

}
apache-spark apache-spark-mllib apache-spark-ml
1个回答
0
投票

我找到了一种方法,至少要消除异常,不确定它是否是正确的apporach。在加载模型后创建管道时,将阶段设置为仅模型,因为模型已经定义了各自的架构。不确定这是否会使新数据正常化。

  pipeline = new Pipeline().setStages(Array( lr1))
© www.soinside.com 2019 - 2024. All rights reserved.