在为这个主题投入大量的网络搜索之后,如果我能得到一些指针,我将在这里结束。请进一步阅读
在分析Spark 2.0之后,我得出结论多项式回归是不可能的火花(单独的火花),所以是否有一些可用于多项式回归的火花扩展? - Rspark它可以完成(但寻找更好的替代方案) - 火花中的RFormula做预测,但系数不可用(这是我的主要要求,因为我主要对系数值感兴趣)
多项式回归只是线性回归的另一种情况(如Polynomial regression is linear regression和Polynomial regression)。由于Spark有一种线性回归方法,您可以调用该方法来更改输入,使新输入适合多项式回归。例如,如果您只有一个自变量x,并且想要进行二次回归,则必须更改[x x ^ 2]的独立输入矩阵。
我想在@Mehdi Lamrani的回答中添加一些信息:
如果要在SparkML中进行多项式线性回归,可以使用PolynomialExpansion类。有关信息,请查看SparkML Doc或Spark API Doc中的课程
这是一个实现示例:
假设我们有一个列车和测试数据集,存储在两个csv文件中,标题包含列的名称(功能,标签)。每个数据集包含三个名为f1,f2,f3的特征,每个特征为Double
(这是X矩阵),以及名为mylabel的标签特征(Y向量)。
对于这段代码,我使用了Spark + Scala:Scala版本:2.12.8 Spark版本2.4.0。
我们假设SparkML库已经在build.sbt中下载了。
首先,进口图书馆:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.{SparkConf, SparkContext}
创建Spark会话和Spark上下文:
val ss = org.apache.spark.sql
.SparkSession.builder()
.master("local")
.appName("Read CSV")
.enableHiveSupport()
.getOrCreate()
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
实例化您要使用的变量:
val f_train:String = "path/to/your/train_file.csv"
val f_test:String = "path/to/your/test_file.csv"
val degree:Int = 3 // Set the degree of your choice
val maxIter:Int = 10 // Set the max number of iterations
val lambda:Double = 0.0 // Set your lambda
val alpha:Double = 0.3 // Set the learning rate
首先,让我们先创建几个udf-s,它们将用于数据读取和预处理。参数'udf toFeatures的类型将是Vector
,后跟特征的参数类型:(Double,Double,Double
)
val toFeatures = udf[Vector, Double, Double, Double] {
(a,b,c) => Vectors.dense(a,b,c)
}
val encodeIntToDouble = udf[Double, Int](_.toDouble)
现在让我们创建一个从CSV中提取数据并使用PolynomialExpansion创建现有新特征的函数:
def getDataPolynomial(
currentfile:String,
sc:SparkSession,
sco:SparkContext,
degree:Int
):DataFrame =
{
val df_rough:DataFrame = sc.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.option("inferSchema", value=true)
.load(currentfile)
.toDF("f1", "f2", "f3", "myLabel")
// you may add or not the last line
val df:DataFrame = df_rough
.withColumn("featNormTemp", toFeatures(df_rough("f1"), df_rough("f2"), df_rough("f3")))
.withColumn("label", Tools.encodeIntToDouble(df_rough("myLabel")))
val polyExpansion = new PolynomialExpansion()
.setInputCol("featNormTemp")
.setOutputCol("polyFeatures")
.setDegree(degree)
val polyDF:DataFrame=polyExpansion.transform(df.select("featNormTemp"))
val datafixedWithFeatures:DataFrame = polyDF.withColumn("features", polyDF("polyFeatures"))
val datafixedWithFeaturesLabel = datafixedWithFeatures
.join(df,df("featNormTemp") === datafixedWithFeatures("featNormTemp"))
.select("label", "polyFeatures")
datafixedWithFeaturesLabel
}
现在,使用所选择的多项式展开度,为列车和测试数据集运行该功能。
val X:DataFrame = getDataPolynomial(f_train,ss,sc,degree)
val X_test:DataFrame = getDataPolynomial(f_test,ss,sc,degree)
运行算法以使用管道获得线性回归模型:
val assembler = new VectorAssembler()
.setInputCols(Array("polyFeatures"))
.setOutputCol("features2")
val lr = new LinearRegression()
.setMaxIter(maxIter)
.setRegParam(lambda)
.setElasticNetParam(alpha)
.setFeaturesCol("features2")
.setLabelCol("label")
// Fit the model:
val pipeline:Pipeline = new Pipeline().setStages(Array(assembler,lr))
val lrModel:PipelineModel = pipeline.fit(X)
// Get prediction on the test set :
val result:DataFrame = lrModel.transform(X_test)
最后,使用均方误差测量来评估结果:
def leastSquaresError(result:DataFrame):Double = {
val rm:RegressionMetrics = new RegressionMetrics(
result
.select("label","prediction")
.rdd
.map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
Math.sqrt(rm.meanSquaredError)
}
val error:Double = leastSquaresError(result)
println("Error : "+error)
我希望这可能有用!