向量汇编器到密集向量的显式转换

问题描述 投票:0回答:1

如何将向量汇编器的输出转换为密集向量而不是稀疏向量?

val featureIndexer = new VectorAssembler().setInputCols(Array("feature1","feature2","feature3")).setOutputCol("indexedFeatures")

    training_set_combined = training_set_combined.na.fill(-9999)
    testing_set_combined = testing_set_combined.na.fill(-9999)

    // training
    val assembler = new VectorAssembler().setInputCols(feature_names.toArray).setOutputCol("features")

    def get_param(): mutable.HashMap[String, Any] = {
        val params = new mutable.HashMap[String, Any]()
            params += "eta" -> 0.1f
            params += "num_round" -> 150
            params += "missing" -> -999
            params += "subsample" -> 1
            params += "objective" -> "binary:logistic"
        return params
    }

    val xgb = new XGBoostClassifier(get_param().toMap).setLabelCol("label").setFeaturesCol("features")
    val pipeline = new Pipeline().setStages(Array(assembler, xgb))
    val xgbclassifier = pipeline.fit(training_set_combined)

我正在寻求将向量汇编器转换为密集向量

scala apache-spark apache-spark-mllib
1个回答
0
投票

这是密集矢量变换器的实现-

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.SQLDataTypes
import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCols}
import org.apache.spark.ml.param.{ParamMap, Params}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._


class DenseVectorConverter(val uid: String) extends Transformer with Params
  with HasInputCols with HasOutputCols with DefaultParamsWritable {
  def this() = this(Identifiable.randomUID("denseVectorConverter"))

  /** @group setParam */
  def setInputCols(value: Array[String]): this.type = set(inputCols, value)

  /** @group setParam */
  def setOutputCols(value: Array[String]): this.type = set(outputCols, value)

  def validateAndTransformSchema(schema: StructType): StructType = {
    require($(inputCols).length == $(inputCols).distinct.length, s"inputCols contains" +
      s" duplicates: (${$(inputCols).mkString(", ")})")
    require($(outputCols).length == $(outputCols).distinct.length, s"outputCols contains" +
      s" duplicates: (${$(outputCols).mkString(", ")})")
    require($(inputCols).length == $(outputCols).length, s"inputCols(${$(inputCols).length})" +
      s" and outputCols(${$(outputCols).length}) should have the same length")

    $(inputCols).zip($(outputCols)).foldLeft(schema) { (schema, inOutCol) =>
      val inputField = schema(inOutCol._1)
      require(inputField.dataType == SQLDataTypes.VectorType, s"Expected dtatype of input col: ${inputField.name} as " +
        s"vector but found ${inputField.dataType}")
      schema.add(inOutCol._2, inputField.dataType, inputField.nullable, inputField.metadata)
    }
  }

  def transformSchema(schema: StructType): StructType = validateAndTransformSchema(schema)

  def copy(extra: ParamMap): RenameColumns = defaultCopy(extra)

  override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema, logging = true)
    val sparseToDense =
      udf((v: org.apache.spark.ml.linalg.Vector) => v.toDense)
    $(inputCols).zip($(outputCols)).foldLeft(dataset.toDF()) { (df, inputColOutputCol) =>
      df.withColumnRenamed(inputColOutputCol._1, inputColOutputCol._2)
      df.withColumn(inputColOutputCol._2,
        sparseToDense(col(inputColOutputCol._1)));
    }
  }
}

我使用下面的测试用例测试了-

 import org.apache.spark.ml.linalg.Vectors

    val data = Array(
      Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
      Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
      Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
    )
    val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
    df.show(false)
//    +---------------------+
//    |features             |
//    +---------------------+
//    |(5,[1,3],[1.0,7.0])  |
//    |[2.0,0.0,3.0,4.0,5.0]|
//    |[4.0,0.0,0.0,6.0,7.0]|
//    +---------------------+
    val denseVectorConverter = new DenseVectorConverter()
      .setInputCols(Array("features"))
      .setOutputCols(Array("features_dense"))
    denseVectorConverter.transform(df).show(false)
//      +---------------------+---------------------+
//      |features             |features_dense       |
//      +---------------------+---------------------+
//      |(5,[1,3],[1.0,7.0])  |[0.0,1.0,0.0,7.0,0.0]|
//      |[2.0,0.0,3.0,4.0,5.0]|[2.0,0.0,3.0,4.0,5.0]|
//      |[4.0,0.0,0.0,6.0,7.0]|[4.0,0.0,0.0,6.0,7.0]|
//      +---------------------+---------------------+

现在,您修改后的代码应如下所示-

val featureIndexer = new VectorAssembler().setInputCols(Array("feature1","feature2","feature3")).setOutputCol("indexedFeatures")

    training_set_combined = training_set_combined.na.fill(-9999)
    testing_set_combined = testing_set_combined.na.fill(-9999)

    // training
    val assembler = new VectorAssembler().setInputCols(feature_names.toArray).setOutputCol("features")

val denseVectorConverter = new DenseVectorConverter()
      .setInputCols(Array("features"))
      .setOutputCols(Array("features_dense"))
    def get_param(): mutable.HashMap[String, Any] = {
        val params = new mutable.HashMap[String, Any]()
            params += "eta" -> 0.1f
            params += "num_round" -> 150
            params += "missing" -> -999
            params += "subsample" -> 1
            params += "objective" -> "binary:logistic"
        return params
    }

    val xgb = new XGBoostClassifier(get_param().toMap).setLabelCol("label").setFeaturesCol("features_dense")
    val pipeline = new Pipeline().setStages(Array(assembler, denseVectorConverter, xgb))
    val xgbclassifier = pipeline.fit(training_set_combined)

我已经修改了您的代码,请对其进行一次测试。

© www.soinside.com 2019 - 2024. All rights reserved.