任何指针都可以逐步训练和构建模型,并获得单个元素的预测。
尝试运行Web应用程序将在共享路径中将数据写入csv,ml应用程序将读取数据并加载模型,尝试拟合数据并保存模型,转换测试数据。 (这应该在循环中发生)
但是当第二次加载保存的模型时,面临以下异常,(我使用minmax缩放器来规范化数据)
线程“main”中的异常java.lang.IllegalArgumentException:输出列features_intermediate已存在。
任何指针都将非常感谢,谢谢
object RunAppPooling {
def main(args: Array[String]): Unit = { // start the spark session
val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.broadcast.compress", "false")
.setAppName("local-spark")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
val filePath = "src/main/resources/train.csv"
val modelPath = "file:///home/vagrant/custom.model"
val schema = StructType(
Array(
StructField("IDLE_COUNT", IntegerType),
StructField("TIMEOUTS", IntegerType),
StructField("ACTIVE_COUNT", IntegerType),
StructField("FACTOR_LOAD", DoubleType)))
while(true){
// read the raw data
val df_raw = spark
.read
.option("header", "true")
.schema(schema)
.csv(filePath)
df_raw.show()
println(df_raw.count())
// fill all na values with 0
val df = df_raw.na.fill(0)
df.printSchema()
// create the feature vector
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" ))
.setOutputCol("features_intermediate")
var lr1: PipelineModel = null
try {
lr1 = PipelineModel.load(modelPath)
} catch {
case ie: InvalidInputException => println(ie.getMessage)
}
import org.apache.spark.ml.feature.StandardScaler
val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")
var pipeline: Pipeline = null
if (lr1 == null) {
val lr =
new LinearRegression()
.setMaxIter(100)
.setRegParam(0.1)
.setElasticNetParam(0.8)
.setLabelCol("FACTOR_LOAD") // setting label column
// create the pipeline with the steps
pipeline = new Pipeline().setStages(Array( vectorAssembler, scaler, lr))
} else {
pipeline = new Pipeline().setStages(Array(vectorAssembler, scaler, lr1))
}
// create the model following the pipeline steps
val cvModel = pipeline.fit(df)
// save the model
cvModel.write.overwrite.save(modelPath)
var testschema = StructType(
Array(
StructField("PACKAGE_KEY", StringType),
StructField("IDLE_COUNT", IntegerType),
StructField("TIMEOUTS", IntegerType),
StructField("ACTIVE_COUNT", IntegerType)
))
val df_raw1 = spark
.read
.option("header", "true")
.schema(testschema)
.csv("src/main/resources/test_pooling.csv")
// fill all na values with 0
val df1 = df_raw1.na.fill(0)
val extracted = cvModel.transform(df1) //.toDF("prediction")
import org.apache.spark.sql.functions._
val test = extracted.select(mean(df("FACTOR_LOAD"))).collect()
println(test.apply(0))
}
}
}
我找到了一种方法,至少要消除异常,不确定它是否是正确的apporach。在加载模型后创建管道时,将阶段设置为仅模型,因为模型已经定义了各自的架构。不确定这是否会使新数据正常化。
pipeline = new Pipeline().setStages(Array( lr1))