如何解释火花列的类型减少

问题描述 投票:-2回答:2

我有以下表

DEST_COUNTRY_NAME   ORIGIN_COUNTRY_NAME count
United States       Romania             15
United States       Croatia             1
United States       Ireland             344
Egypt               United States       15  

该表被表示为数据集。

scala> dataDS
res187: org.apache.spark.sql.Dataset[FlightData] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

dataDS的模式是

scala> dataDS.printSchema;
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

我要总结的count列的所有值。我想我可以用reduceDataset方法做到这一点。

我想我可以做到以下几点,但得到的错误

scala> (dataDS.select(col("count"))).reduce((acc,n)=>acc+n);
<console>:38: error: type mismatch;
 found   : org.apache.spark.sql.Row
 required: String
       (dataDS.select(col("count"))).reduce((acc,n)=>acc+n);
                                                         ^

为了使代码的工作,我必须明确指定countInt即使在架构中,它是一个Int

scala> (dataDS.select(col("count").as[Int])).reduce((acc,n)=>acc+n);

为什么我要明确指定count的类型?为什么Scala的type inference没有工作?实际上,中间Dataset的架构也推断count作为Int

dataDS.select(col("count")).printSchema;
root
 |-- count: integer (nullable = true)
scala apache-spark fold apache-spark-dataset
2个回答
2
投票

只要按照类型或看编译器的消息。

  • 你开始Dataset[FlightData]
  • 你调用它的selectcol("count")作为参数。 col(_)返回Column
  • The only variantDataset.select这需要Column返回DataFrame which is an alias for Dataset[Row]
  • Dataset.reduce一个服用ReduceFunction[T]和第二(T, T) => T,其中T是键入Dataset,即Dataset[T]的构造参数的两种变型。 (acc,n)=>acc+n功能是Scala的匿名函数,因此第二个版本适用。
  • 扩展: (dataDS.select(col("count")): Dataset[Row]).reduce((acc: Row, n: Row) => acc + n): Row 其中规定的限制 - 函数接受RowRow并返回Row
  • Row没有+的方法,因此是唯一的选择,以满足 (acc: ???, n: Row) => acc + n) 是使用String(你可以+ AnyString。 然而,这并不满足完整的表达 - 因此错误。
  • 你已经想通了,你可以使用 dataDS.select(col("count").as[Int]).reduce((acc, n) => acc + n) 其中col("count").as[Int]TypedColumn[Row, Int]corresponding select返回Dataset[Int]。 同样你可以 dataDS.select(col("count")).as[Int].reduce((acc, n) => acc + n) dataDS.toDF.map(_.getAs[Int]("count")).reduce((acc, n) => acc + n) 在所有情况下 .reduce((acc, n) => acc + n) (Int, Int) => Int

3
投票

我认为你需要做的是另一种方式。我将承担FlightData是区分阶级与上面的架构。因此,解决方案是使用地图和如下减少

val totalSum = dataDS.map(_.count).reduce(_+_) //this line replace the above error as col("count") can't be selected.

更新:当您使用选择您将数据框上工作(同样的,如果你加入),这是不是静态类型的架构,你将失去你的案件类别的特征推理的问题是不相关的数据集,其实,。例如,选择的类型将是数据帧不DataSet,以便您将无法推断出类型。

val x: DataFrame = dataDS.select('count)
val x: Dataset[Int] = dataDS.map(_.count)

另外,从这个Answer要获得TypedColumn从列只需使用myCol.as[T]

我做了一个简单的例子来重现代码和数据。

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object EntryMainPoint extends App {

  //val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
  val spark = SparkSession
    .builder()
    .master("local[*]")
    .appName("SparkSessionZipsExample")
    //.config("spark.sql.warehouse.dir", warehouseLocation)
    .getOrCreate()

  val someData = Seq(
    Row("United States", "Romania", 15),
    Row("United States", "Croatia", 1),
    Row("United States", "Ireland", 344),
    Row("Egypt", "United States", 15)
  )


  val flightDataSchema = List(
    StructField("DEST_COUNTRY_NAME", StringType, true),
    StructField("ORIGIN_COUNTRY_NAME", StringType, true),
    StructField("count", IntegerType, true)
  )

  case class FlightData(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: Int)
  import spark.implicits._

  val dataDS = spark.createDataFrame(
    spark.sparkContext.parallelize(someData),
    StructType(flightDataSchema)
  ).as[FlightData]

  val totalSum = dataDS.map(_.count).reduce(_+_) //this line replace the above error as col("count") can't be selected.
  println("totalSum = " + totalSum)


  dataDS.printSchema()
  dataDS.show()


}

下面的输出

totalSum = 375

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
+-----------------+-------------------+-----+

注意:您可以通过以下方式从数据集做一个选择

val countColumn = dataDS.select('count) //or map(_.count)

您也可以在此reduceByKey in Spark Dataset看看

© www.soinside.com 2019 - 2024. All rights reserved.