火花的外部包装中的多项式回归

问题描述 投票:8回答:2

在为这个主题投入大量的网络搜索之后,如果我能得到一些指针,我将在这里结束。请进一步阅读

在分析Spark 2.0之后,我得出结论多项式回归是不可能的火花(单独的火花),所以是否有一些可用于多项式回归的火花扩展? - Rspark它可以完成(但寻找更好的替代方案) - 火花中的RFormula做预测,但系数不可用(这是我的主要要求,因为我主要对系数值感兴趣)

machine-learning regression apache-spark-mllib
2个回答
3
投票

多项式回归只是线性回归的另一种情况(如Polynomial regression is linear regressionPolynomial regression)。由于Spark有一种线性回归方法,您可以调用该方法来更改输入,使新输入适合多项式回归。例如,如果您只有一个自变量x,并且想要进行二次回归,则必须更改[x x ^ 2]的独立输入矩阵。


0
投票

我想在@Mehdi Lamrani的回答中添加一些信息:

如果要在SparkML中进行多项式线性回归,可以使用PolynomialExpansion类。有关信息,请查看SparkML DocSpark API Doc中的课程

这是一个实现示例:

假设我们有一个列车和测试数据集,存储在两个csv文件中,标题包含列的名称(功能,标签)。每个数据集包含三个名为f1,f2,f3的特征,每个特征为Double(这是X矩阵),以及名为mylabel的标签特征(Y向量)。

对于这段代码,我使用了Spark + Scala:Scala版本:2.12.8 Spark版本2.4.0。

我们假设SparkML库已经在build.sbt中下载了。

首先,进口图书馆:

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.{SparkConf, SparkContext}

创建Spark会话和Spark上下文:

val ss = org.apache.spark.sql
  .SparkSession.builder()
  .master("local")
  .appName("Read CSV")
  .enableHiveSupport()
  .getOrCreate()

val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)

实例化您要使用的变量:

val f_train:String = "path/to/your/train_file.csv"
val f_test:String = "path/to/your/test_file.csv"
val degree:Int = 3 // Set the degree of your choice
val maxIter:Int = 10 // Set the max number of iterations
val lambda:Double = 0.0 // Set your lambda
val alpha:Double = 0.3 // Set the learning rate 

首先,让我们先创建几个udf-s,它们将用于数据读取和预处理。参数'udf toFeatures的类型将是Vector,后跟特征的参数类型:(Double,Double,Double

val toFeatures = udf[Vector, Double, Double, Double] {
        (a,b,c) => Vectors.dense(a,b,c)
    }

val encodeIntToDouble    = udf[Double, Int](_.toDouble)

现在让我们创建一个从CSV中提取数据并使用PolynomialExpansion创建现有新特征的函数:

def getDataPolynomial(
    currentfile:String, 
    sc:SparkSession, 
    sco:SparkContext, 
    degree:Int
    ):DataFrame = 
{
    val df_rough:DataFrame = sc.read
      .format("csv")
      .option("header", "true") //first line in file has headers
      .option("mode", "DROPMALFORMED")
      .option("inferSchema", value=true)
      .load(currentfile)
      .toDF("f1", "f2", "f3", "myLabel")
       // you may add or not the last line 

    val df:DataFrame = df_rough
        .withColumn("featNormTemp", toFeatures(df_rough("f1"), df_rough("f2"), df_rough("f3")))
        .withColumn("label", Tools.encodeIntToDouble(df_rough("myLabel")))

    val polyExpansion = new PolynomialExpansion()
      .setInputCol("featNormTemp")
      .setOutputCol("polyFeatures")
      .setDegree(degree)

    val polyDF:DataFrame=polyExpansion.transform(df.select("featNormTemp"))

    val datafixedWithFeatures:DataFrame = polyDF.withColumn("features", polyDF("polyFeatures"))

    val datafixedWithFeaturesLabel = datafixedWithFeatures
          .join(df,df("featNormTemp") === datafixedWithFeatures("featNormTemp"))
          .select("label", "polyFeatures")

    datafixedWithFeaturesLabel
}

现在,使用所选择的多项式展开度,为列车和测试数据集运行该功能。

val X:DataFrame = getDataPolynomial(f_train,ss,sc,degree)
val X_test:DataFrame = getDataPolynomial(f_test,ss,sc,degree)

运行算法以使用管道获得线性回归模型:

val assembler = new VectorAssembler()
   .setInputCols(Array("polyFeatures"))
   .setOutputCol("features2")

val lr = new LinearRegression()
    .setMaxIter(maxIter)
    .setRegParam(lambda)
    .setElasticNetParam(alpha)
    .setFeaturesCol("features2")
    .setLabelCol("label")

// Fit the model:
val pipeline:Pipeline = new Pipeline().setStages(Array(assembler,lr))
val lrModel:PipelineModel = pipeline.fit(X)

// Get prediction on the test set :
val result:DataFrame = lrModel.transform(X_test)

最后,使用均方误差测量来评估结果:

def leastSquaresError(result:DataFrame):Double = {
    val rm:RegressionMetrics = new RegressionMetrics(
        result
            .select("label","prediction")
            .rdd
            .map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
    Math.sqrt(rm.meanSquaredError)
}

val error:Double = leastSquaresError(result)
println("Error : "+error)

我希望这可能有用!

© www.soinside.com 2019 - 2024. All rights reserved.