Byspark的反团体组织/ R适用

问题描述 投票:1回答:2

我是进入pyspark世界的R程序员,已经掌握了许多基本技巧,但我仍在苦苦挣扎的是我将要应用的东西或for循环的基本东西。

在这种情况下,我正在尝试为ID计算“ anti-groupby”。基本上,想法是先查看该ID的填充,然后查看该ID的填充,然后将这两个值放在同一行。使用groupby轻松获取该ID的人口,然后将其连接到以new_id作为唯一列的数据集即可。

这就是我在R中的做法:

anti_group <- function(id){
    tr <- sum(subset(df1, new_id!=id)$total_1)
    to <- sum(subset(df1, new_id!=id)$total_2)
    54 * tr / to
  }
  test$other.RP54 <- sapply(test$new_id, anti_group  )

我将如何在pyspark中做到这一点?

谢谢!

编辑:

#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
|  2|   10|
|  2|   90|
|  3|   20|
|  3|   10|
|  4|    2|
|  4|    5|
+---+-----+

然后是一些创建最终数据帧的函数,如下所示:

+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  1|           70|               137|
|  2|          100|               107|
|  3|           30|               177|
|  4|            7|               200|
+---+-------------+------------------+

r pyspark group-by sapply pyspark-dataframes
2个回答
0
投票

我认为您可以通过两个步骤来做到这一点:首先,您通过id求和,然后将总数减去该id的值。

我的想法有点像group_by(id) %>% summarise(x = sum(x)) %>% mutate(y = sum(x) - x)中的dplyr

我建议的解决方案基于Window函数。未经测试:

import pyspark.sql.functions as psf
import pyspark.sql.window as psw

w = psw.Window.orderBy("id")
df_id = df.groupBy("id").agg(psf.sum("x").alias("x"))
df_id = (df_id
          .withColumn("y",psf.sum("x").over(w))
          .withColumn('y', psf.col('y') - psf.col('x'))
        )

0
投票

所以没有内置函数可以复制groupBy函数,但是您可以通过使用case(when/otherwise clause)创建新列来创建group and anti-group,然后轻松地完成此操作groupBy上的new column

#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
|  2|   10|
|  2|   90|
|  3|   20|
|  3|   10|
|  4|    2|
|  4|    5|
+---+-----+

from pyspark.sql import functions as F
df.withColumn("anti_id_1", F.when(F.col("id")==1, F.lit('1')).otherwise(F.lit('Not_1')))\
  .groupBy("anti_id_1").agg(F.sum("value").alias("sum")).show()

+---------+---+
|anti_id_1|sum|
+---------+---+
|        1| 70|
|    Not_1|137|
+---------+---+

UPDATE:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

w1=Window().partitionBy("id")
w=Window().partitionBy()
df.withColumn("grouped_total",F.sum("value").over(w1))\
  .withColumn("anti_grouped_total", (F.sum("value").over(w))-F.col("grouped_total"))\
  .groupBy("id").agg(F.first("grouped_total").alias("grouped_total"),\
                     F.first("anti_grouped_total").alias("anti_grouped_total"))\
  .drop("value").show()


+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  1|           70|               137|
|  3|           30|               177|
|  2|          100|               107|
|  4|            7|               200|
+---+-------------+------------------+
© www.soinside.com 2019 - 2024. All rights reserved.