如何获得每个类别的百分比在给定的日期与PySpark

问题描述 投票:0回答:2

给予在Pyspark一个尝试和努力。

我有这个在这里...

internal_pct_by_day = df_resampled.groupBy('dt_resampled', 'traffic_source').count()
internal_pct_by_day.show(5)

+-------------------+--------------+-----+
|       dt_resampled|traffic_source|count|
+-------------------+--------------+-----+
|2016-06-13 20:00:00|             2|  320|
|2016-06-13 20:00:00|             1|   84|
|2016-06-14 20:00:00|             2|   66|
|2016-06-14 20:00:00|             3|    4|
|2016-06-13 20:00:00|             3|    1|
+-------------------+--------------+-----+

我本来在独特的时间多条记录,并通过我一天重新采样'他们。现在我的表显示,320人次通过流量来源2于13日来了,通过流量来源3于14日等4名观众前来

我现在想增加一列显示哪些%的访问者来自每个来源的每一天来了。

理想的解决方案应该是这样的:

+-------------------+--------------+-----+
|       dt_resampled|traffic_source|count| //total   //percent
+-------------------+--------------+-----+
|2016-06-13 20:00:00|             2|  320| //405     //79%
|2016-06-13 20:00:00|             1|   84| //405     //20%
|2016-06-14 20:00:00|             2|   66| //70      //94% 
|2016-06-14 20:00:00|             3|    4| //70      //6%
|2016-06-13 20:00:00|             3|    1| //405     //1%
+-------------------+--------------+-----+

我从错位堆栈溢出一些代码,本想出了:

from pyspark.sql.functions import rank,sum,col
from pyspark.sql import Window

window = Window.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

grouped_df = df_resampled\
.groupBy('dt_resampled', 'traffic_source')\
.agg(F.sum('traffic_source').alias('sum_traffic_source'))\
.withColumn('total', sum(col('sum_traffic_source')).over(window))\
.withColumn('percent',col('sum_traffic_source')*100/col('total'))

grouped_df.show(5)

+-------------------+--------------+------------------+-----+-------------------+
|       dt_resampled|traffic_source|sum_traffic_source|total|            percent|
+-------------------+--------------+------------------+-----+-------------------+
|2016-06-13 20:00:00|             2|               640|  896|  71.42857142857143|
|2016-06-13 20:00:00|             1|                84|  896|              9.375|
|2016-06-14 20:00:00|             2|               132|  896| 14.732142857142858|
|2016-06-14 20:00:00|             3|                12|  896| 1.3392857142857142|
|2016-06-13 20:00:00|             3|                 3|  896|0.33482142857142855|
+-------------------+--------------+------------------+-----+-------------------+

我似乎无法得到我想要的东西相当。任何帮助将非常感激!

python apache-spark pyspark
2个回答
1
投票

步骤0:导入相关功能

from pyspark.sql.functions import to_timestamp, col, round

第1步:创建相关的数据帧。

valuesCol = [('2016-06-13 20:00:00',2,320),('2016-06-13 20:00:00',1,84),('2016-06-14 20:00:00',2,66),
             ('2016-06-14 20:00:00',3,4),('2016-06-13 20:00:00',3,1)]
df = sqlContext.createDataFrame(valuesCol,['dt_resampled','traffic_source','count'])
# Cast the string to proper timestamp
df = df.withColumn('dt_resampled',to_timestamp(col('dt_resampled'), 'yyyy-MM-dd HH:mm:ss'))    
df.show()
+-------------------+--------------+-----+
|       dt_resampled|traffic_source|count|
+-------------------+--------------+-----+
|2016-06-13 20:00:00|             2|  320|
|2016-06-13 20:00:00|             1|   84|
|2016-06-14 20:00:00|             2|   66|
|2016-06-14 20:00:00|             3|    4|
|2016-06-13 20:00:00|             3|    1|
+-------------------+--------------+-----+
df.printSchema()
root
 |-- dt_resampled: timestamp (nullable = true)
 |-- traffic_source: long (nullable = true)
 |-- count: long (nullable = true)

第2步:计算每天的总流量。为了写熟悉SQL语法上DataFrame操作,我们必须首先注册为temporary SQL view,在第一线做如下 -

df.createOrReplaceTempView('table_view')
df=sqlContext.sql(
    'select dt_resampled, traffic_source, count, sum(count) over (partition by dt_resampled) as total_per_day from table_view'
)
df = df.withColumn('percent', round(col('count')/col('total_per_day'),4))
df.show()
+-------------------+--------------+-----+-------------+-------+
|       dt_resampled|traffic_source|count|total_per_day|percent|
+-------------------+--------------+-----+-------------+-------+
|2016-06-14 20:00:00|             2|   66|           70| 0.9429|
|2016-06-14 20:00:00|             3|    4|           70| 0.0571|
|2016-06-13 20:00:00|             2|  320|          405| 0.7901|
|2016-06-13 20:00:00|             1|   84|          405| 0.2074|
|2016-06-13 20:00:00|             3|    1|          405| 0.0025|
+-------------------+--------------+-----+-------------+-------+

percent的精度可以通过相应地指定round()功能的第二个参数被改变。


0
投票

使用groupby

像这样

df['total'] = df['dt_resampled'].map(df.groupby('dt_resampled')['count'].sum()) 
df['percent'] = df['count']/df['total']*100

          dt_resampled  traffic_source  count  total    percent
0  2016-06-13 20:00:00               2    320    405  79.012346
1  2016-06-13 20:00:00               1     84    405  20.740741
2  2016-06-14 20:00:00               2     66     70  94.285714
3  2016-06-14 20:00:00               3      4     70   5.714286
4  2016-06-13 20:00:00               3      1    405   0.246914

© www.soinside.com 2019 - 2024. All rights reserved.