如何使用 pyspark RDD 对数据进行分区、排名和排序?

问题描述 投票:0回答:1

我正在使用pyspark,并且有一个叫做 RDD 是下面的格式。

RDD1 = (age, code, count)

我需要找到这段代码 每个年龄段的最高计数。

我是在一个 dataframe 使用 Window functionpartitioning by age:

df1 = df.withColumn("rank", rank().over(Window.partitionBy("age") 
\.orderBy(desc("count")))).sort("age", desc("count"))

df2 = df1.select("age", "code", "count", "rank").where("rank = 1")

然而,我需要只用以下方法找到同样的结果 RDD operations但我不太清楚该怎么做。任何建议都将是非常有用的

pyspark rdd
1个回答
1
投票

试试这个(对于pyspark)。

rdd1.keyBy(lambda x: x[0]).reduceByKey(lambda x,y: x if x[2] >= y[2] else y).values().collect()

Where:

  1. 使用 keyBy(lambda x: x[0]) 将原来的RDD转换为包含以下元素的对RDD。(age, (age, code, count))
  2. 使用 reduceByKey(lambda x,y: x if x[2] >= y[2] else y) 寻找每个年龄段的max(count)元素。
  3. 采取 values() 也就是 (age, code, count)

注意: 在最大值为平局的情况下,这只需要一个元素。


0
投票

可惜此时此刻 window functionsPARTITION BY 子句将所有的数据移动到一个单一的分区,所以它是特别有用的,如果你有大的数据集。

如果你不介意在那里使用开发者API,你可以尝试一下 RDDFunctions.sliding 但需要人工处理。

import org.apache.spark.mllib.rdd.RDDFunctions._

val first = rdd.first match {
  case NameValue(name, value) => NameValueWithLag(name, value, value)
}

sc.parallelize(Seq(first)).union(rdd
  .sliding(2)
  .map(a => NameValueWithLag(a(1).name, a(1).value, a(1).value - a(0).value)))

用提供的权重随机分割该RDD。

最简单的方法是将RDD转换为数据框,并执行操作将其转换为RDD。

数据框转RDD

之前 Spark 2.0, spark_df.map 将化名为 spark_df.rdd.map(). 在Spark 2.0中,你必须明确地调用 .rdd 第一 spark_df.rdd.map().

© www.soinside.com 2019 - 2024. All rights reserved.