给予在Pyspark一个尝试和努力。
我有这个在这里...
internal_pct_by_day = df_resampled.groupBy('dt_resampled', 'traffic_source').count()
internal_pct_by_day.show(5)
+-------------------+--------------+-----+
| dt_resampled|traffic_source|count|
+-------------------+--------------+-----+
|2016-06-13 20:00:00| 2| 320|
|2016-06-13 20:00:00| 1| 84|
|2016-06-14 20:00:00| 2| 66|
|2016-06-14 20:00:00| 3| 4|
|2016-06-13 20:00:00| 3| 1|
+-------------------+--------------+-----+
我本来在独特的时间多条记录,并通过我一天重新采样'他们。现在我的表显示,320人次通过流量来源2于13日来了,通过流量来源3于14日等4名观众前来
我现在想增加一列显示哪些%的访问者来自每个来源的每一天来了。
理想的解决方案应该是这样的:
+-------------------+--------------+-----+
| dt_resampled|traffic_source|count| //total //percent
+-------------------+--------------+-----+
|2016-06-13 20:00:00| 2| 320| //405 //79%
|2016-06-13 20:00:00| 1| 84| //405 //20%
|2016-06-14 20:00:00| 2| 66| //70 //94%
|2016-06-14 20:00:00| 3| 4| //70 //6%
|2016-06-13 20:00:00| 3| 1| //405 //1%
+-------------------+--------------+-----+
我从错位堆栈溢出一些代码,本想出了:
from pyspark.sql.functions import rank,sum,col
from pyspark.sql import Window
window = Window.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
grouped_df = df_resampled\
.groupBy('dt_resampled', 'traffic_source')\
.agg(F.sum('traffic_source').alias('sum_traffic_source'))\
.withColumn('total', sum(col('sum_traffic_source')).over(window))\
.withColumn('percent',col('sum_traffic_source')*100/col('total'))
grouped_df.show(5)
+-------------------+--------------+------------------+-----+-------------------+
| dt_resampled|traffic_source|sum_traffic_source|total| percent|
+-------------------+--------------+------------------+-----+-------------------+
|2016-06-13 20:00:00| 2| 640| 896| 71.42857142857143|
|2016-06-13 20:00:00| 1| 84| 896| 9.375|
|2016-06-14 20:00:00| 2| 132| 896| 14.732142857142858|
|2016-06-14 20:00:00| 3| 12| 896| 1.3392857142857142|
|2016-06-13 20:00:00| 3| 3| 896|0.33482142857142855|
+-------------------+--------------+------------------+-----+-------------------+
我似乎无法得到我想要的东西相当。任何帮助将非常感激!
步骤0:导入相关功能
from pyspark.sql.functions import to_timestamp, col, round
第1步:创建相关的数据帧。
valuesCol = [('2016-06-13 20:00:00',2,320),('2016-06-13 20:00:00',1,84),('2016-06-14 20:00:00',2,66),
('2016-06-14 20:00:00',3,4),('2016-06-13 20:00:00',3,1)]
df = sqlContext.createDataFrame(valuesCol,['dt_resampled','traffic_source','count'])
# Cast the string to proper timestamp
df = df.withColumn('dt_resampled',to_timestamp(col('dt_resampled'), 'yyyy-MM-dd HH:mm:ss'))
df.show()
+-------------------+--------------+-----+
| dt_resampled|traffic_source|count|
+-------------------+--------------+-----+
|2016-06-13 20:00:00| 2| 320|
|2016-06-13 20:00:00| 1| 84|
|2016-06-14 20:00:00| 2| 66|
|2016-06-14 20:00:00| 3| 4|
|2016-06-13 20:00:00| 3| 1|
+-------------------+--------------+-----+
df.printSchema()
root
|-- dt_resampled: timestamp (nullable = true)
|-- traffic_source: long (nullable = true)
|-- count: long (nullable = true)
第2步:计算每天的总流量。为了写熟悉SQL
语法上DataFrame
操作,我们必须首先注册为temporary SQL view
,在第一线做如下 -
df.createOrReplaceTempView('table_view')
df=sqlContext.sql(
'select dt_resampled, traffic_source, count, sum(count) over (partition by dt_resampled) as total_per_day from table_view'
)
df = df.withColumn('percent', round(col('count')/col('total_per_day'),4))
df.show()
+-------------------+--------------+-----+-------------+-------+
| dt_resampled|traffic_source|count|total_per_day|percent|
+-------------------+--------------+-----+-------------+-------+
|2016-06-14 20:00:00| 2| 66| 70| 0.9429|
|2016-06-14 20:00:00| 3| 4| 70| 0.0571|
|2016-06-13 20:00:00| 2| 320| 405| 0.7901|
|2016-06-13 20:00:00| 1| 84| 405| 0.2074|
|2016-06-13 20:00:00| 3| 1| 405| 0.0025|
+-------------------+--------------+-----+-------------+-------+
percent
的精度可以通过相应地指定round()
功能的第二个参数被改变。
使用groupby
像这样
df['total'] = df['dt_resampled'].map(df.groupby('dt_resampled')['count'].sum())
df['percent'] = df['count']/df['total']*100
dt_resampled traffic_source count total percent
0 2016-06-13 20:00:00 2 320 405 79.012346
1 2016-06-13 20:00:00 1 84 405 20.740741
2 2016-06-14 20:00:00 2 66 70 94.285714
3 2016-06-14 20:00:00 3 4 70 5.714286
4 2016-06-13 20:00:00 3 1 405 0.246914