我有以下表
DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME count
United States Romania 15
United States Croatia 1
United States Ireland 344
Egypt United States 15
该表被表示为数据集。
scala> dataDS
res187: org.apache.spark.sql.Dataset[FlightData] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
dataDS
的模式是
scala> dataDS.printSchema;
root
|-- DEST_COUNTRY_NAME: string (nullable = true)
|-- ORIGIN_COUNTRY_NAME: string (nullable = true)
|-- count: integer (nullable = true)
我要总结的count
列的所有值。我想我可以用reduce
的Dataset
方法做到这一点。
我想我可以做到以下几点,但得到的错误
scala> (dataDS.select(col("count"))).reduce((acc,n)=>acc+n);
<console>:38: error: type mismatch;
found : org.apache.spark.sql.Row
required: String
(dataDS.select(col("count"))).reduce((acc,n)=>acc+n);
^
为了使代码的工作,我必须明确指定count
是Int
即使在架构中,它是一个Int
scala> (dataDS.select(col("count").as[Int])).reduce((acc,n)=>acc+n);
为什么我要明确指定count
的类型?为什么Scala的type inference
没有工作?实际上,中间Dataset
的架构也推断count
作为Int
。
dataDS.select(col("count")).printSchema;
root
|-- count: integer (nullable = true)
只要按照类型或看编译器的消息。
Dataset[FlightData]
。select
与col("count")
作为参数。 col(_)
返回Column
Dataset.select
这需要Column
返回DataFrame
which is an alias for Dataset[Row]
。Dataset.reduce
一个服用ReduceFunction[T]
和第二(T, T) => T
,其中T
是键入Dataset
,即Dataset[T]
的构造参数的两种变型。 (acc,n)=>acc+n
功能是Scala的匿名函数,因此第二个版本适用。(dataDS.select(col("count")): Dataset[Row]).reduce((acc: Row, n: Row) => acc + n): Row
其中规定的限制 - 函数接受Row
和Row
并返回Row
。Row
没有+
的方法,因此是唯一的选择,以满足
(acc: ???, n: Row) => acc + n)
是使用String
(你可以+
Any
到String
。
然而,这并不满足完整的表达 - 因此错误。dataDS.select(col("count").as[Int]).reduce((acc, n) => acc + n)
其中col("count").as[Int]
是TypedColumn[Row, Int]
和corresponding select
返回Dataset[Int]
。
同样你可以
dataDS.select(col("count")).as[Int].reduce((acc, n) => acc + n)
和
dataDS.toDF.map(_.getAs[Int]("count")).reduce((acc, n) => acc + n)
在所有情况下
.reduce((acc, n) => acc + n)
是(Int, Int) => Int
。我认为你需要做的是另一种方式。我将承担FlightData是区分阶级与上面的架构。因此,解决方案是使用地图和如下减少
val totalSum = dataDS.map(_.count).reduce(_+_) //this line replace the above error as col("count") can't be selected.
更新:当您使用选择您将数据框上工作(同样的,如果你加入),这是不是静态类型的架构,你将失去你的案件类别的特征推理的问题是不相关的数据集,其实,。例如,选择的类型将是数据帧不DataSet,以便您将无法推断出类型。
val x: DataFrame = dataDS.select('count)
val x: Dataset[Int] = dataDS.map(_.count)
另外,从这个Answer要获得TypedColumn
从列只需使用myCol.as[T]
。
我做了一个简单的例子来重现代码和数据。
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object EntryMainPoint extends App {
//val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder()
.master("local[*]")
.appName("SparkSessionZipsExample")
//.config("spark.sql.warehouse.dir", warehouseLocation)
.getOrCreate()
val someData = Seq(
Row("United States", "Romania", 15),
Row("United States", "Croatia", 1),
Row("United States", "Ireland", 344),
Row("Egypt", "United States", 15)
)
val flightDataSchema = List(
StructField("DEST_COUNTRY_NAME", StringType, true),
StructField("ORIGIN_COUNTRY_NAME", StringType, true),
StructField("count", IntegerType, true)
)
case class FlightData(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: Int)
import spark.implicits._
val dataDS = spark.createDataFrame(
spark.sparkContext.parallelize(someData),
StructType(flightDataSchema)
).as[FlightData]
val totalSum = dataDS.map(_.count).reduce(_+_) //this line replace the above error as col("count") can't be selected.
println("totalSum = " + totalSum)
dataDS.printSchema()
dataDS.show()
}
下面的输出
totalSum = 375
root
|-- DEST_COUNTRY_NAME: string (nullable = true)
|-- ORIGIN_COUNTRY_NAME: string (nullable = true)
|-- count: integer (nullable = true)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 15|
| United States| Croatia| 1|
| United States| Ireland| 344|
| Egypt| United States| 15|
+-----------------+-------------------+-----+
注意:您可以通过以下方式从数据集做一个选择
val countColumn = dataDS.select('count) //or map(_.count)
您也可以在此reduceByKey in Spark Dataset看看