我正在使用pyspark,并且有一个叫做 RDD
是下面的格式。
RDD1 = (age, code, count)
我需要找到这段代码 每个年龄段的最高计数。
我是在一个 dataframe
使用 Window function
和 partitioning by age
:
df1 = df.withColumn("rank", rank().over(Window.partitionBy("age")
\.orderBy(desc("count")))).sort("age", desc("count"))
df2 = df1.select("age", "code", "count", "rank").where("rank = 1")
然而,我需要只用以下方法找到同样的结果 RDD operations
但我不太清楚该怎么做。任何建议都将是非常有用的
试试这个(对于pyspark)。
rdd1.keyBy(lambda x: x[0]).reduceByKey(lambda x,y: x if x[2] >= y[2] else y).values().collect()
Where:
keyBy(lambda x: x[0])
将原来的RDD转换为包含以下元素的对RDD。(age, (age, code, count))
reduceByKey(lambda x,y: x if x[2] >= y[2] else y)
寻找每个年龄段的max(count)元素。values()
也就是 (age, code, count)
注意: 在最大值为平局的情况下,这只需要一个元素。
可惜此时此刻 window functions
橆 PARTITION BY
子句将所有的数据移动到一个单一的分区,所以它是特别有用的,如果你有大的数据集。
如果你不介意在那里使用开发者API,你可以尝试一下 RDDFunctions.sliding
但需要人工处理。
import org.apache.spark.mllib.rdd.RDDFunctions._
val first = rdd.first match {
case NameValue(name, value) => NameValueWithLag(name, value, value)
}
sc.parallelize(Seq(first)).union(rdd
.sliding(2)
.map(a => NameValueWithLag(a(1).name, a(1).value, a(1).value - a(0).value)))
最简单的方法是将RDD转换为数据框,并执行操作将其转换为RDD。
数据框转RDD
之前 Spark 2.0,
spark_df.map
将化名为 spark_df.rdd.map()
. 在Spark 2.0中,你必须明确地调用 .rdd
第一 spark_df.rdd.map()
.