我是进入pyspark世界的R程序员,已经掌握了许多基本技巧,但我仍在苦苦挣扎的是我将要应用的东西或for循环的基本东西。
在这种情况下,我正在尝试为ID计算“ anti-groupby”。基本上,想法是先查看该ID的填充,然后查看该ID的填充,然后将这两个值放在同一行。使用groupby轻松获取该ID的人口,然后将其连接到以new_id作为唯一列的数据集即可。
这就是我在R中的做法:
anti_group <- function(id){
tr <- sum(subset(df1, new_id!=id)$total_1)
to <- sum(subset(df1, new_id!=id)$total_2)
54 * tr / to
}
test$other.RP54 <- sapply(test$new_id, anti_group )
我将如何在pyspark中做到这一点?
谢谢!
编辑:
#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
| 1| 40|
| 1| 30|
| 2| 10|
| 2| 90|
| 3| 20|
| 3| 10|
| 4| 2|
| 4| 5|
+---+-----+
然后是一些创建最终数据帧的函数,如下所示:
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
| 1| 70| 137|
| 2| 100| 107|
| 3| 30| 177|
| 4| 7| 200|
+---+-------------+------------------+
我认为您可以通过两个步骤来做到这一点:首先,您通过id求和,然后将总数减去该id的值。
我的想法有点像group_by(id) %>% summarise(x = sum(x)) %>% mutate(y = sum(x) - x)
中的dplyr
我建议的解决方案基于Window
函数。未经测试:
import pyspark.sql.functions as psf
import pyspark.sql.window as psw
w = psw.Window.orderBy("id")
df_id = df.groupBy("id").agg(psf.sum("x").alias("x"))
df_id = (df_id
.withColumn("y",psf.sum("x").over(w))
.withColumn('y', psf.col('y') - psf.col('x'))
)
所以没有内置函数可以复制groupBy函数,但是您可以通过使用case(when/otherwise clause)
创建新列来创建group and anti-group,然后轻松地完成此操作groupBy
上的new column
。
#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
| 1| 40|
| 1| 30|
| 2| 10|
| 2| 90|
| 3| 20|
| 3| 10|
| 4| 2|
| 4| 5|
+---+-----+
from pyspark.sql import functions as F
df.withColumn("anti_id_1", F.when(F.col("id")==1, F.lit('1')).otherwise(F.lit('Not_1')))\
.groupBy("anti_id_1").agg(F.sum("value").alias("sum")).show()
+---------+---+
|anti_id_1|sum|
+---------+---+
| 1| 70|
| Not_1|137|
+---------+---+
UPDATE:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w1=Window().partitionBy("id")
w=Window().partitionBy()
df.withColumn("grouped_total",F.sum("value").over(w1))\
.withColumn("anti_grouped_total", (F.sum("value").over(w))-F.col("grouped_total"))\
.groupBy("id").agg(F.first("grouped_total").alias("grouped_total"),\
F.first("anti_grouped_total").alias("anti_grouped_total"))\
.drop("value").show()
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
| 1| 70| 137|
| 3| 30| 177|
| 2| 100| 107|
| 4| 7| 200|
+---+-------------+------------------+