在 Spark SQL 中计算运行总和

问题描述 投票:0回答:1

我正在研究一个逻辑,我需要根据每日扫描计数计算总扫描、最后 5 天扫描、月 2 天扫描。截至今天,我每天都会对每日扫描计数进行汇总,但现在数据量使得计算变得困难。作为一种新方法,我正在考虑使用运行总和,但我无法弄清楚如何计算总扫描的运行总和,即今天的总扫描将是 - 最后的总扫描值 + 今天的扫描计数(其中最后的总扫描也可以是 1)一个月或两个月前)

ProcessName          DailyScan.    Date
 NewInsurance.         8000        04/12/2024
 InsuranceRenewal.      4500.       04/12/2024
 Fraud Detection.       28.           04/12/2024
 Policy Withdrawn.      100.          04/01/2024
 NewInsurance.           2100.         04/13/2024
 New Insurance           400           04/14/2024
InsuranceRenewal         500           04/14/2024
InsuranceRenewal         500           04/18/2024
New Insurance           500           04/18/2024

所需输出 - 假设我在 04/18/2024 执行查询

ProcessName   TotalScan Last5DayScan  Month2DayScan   DailyScan     Date 
NewInsurance    8000     8000          8000              8000        04/12/2024
NewInsurance    10100     10100        10100             2100        04/13/2024
NewInsurance    10500     10500        10500             400         04/14/2024
NewInsurance    11000      900         11000             500         04/18/2024

我每天对整个数据集进行 sum(dailyscan)(在将源表与日历表连接并按 ProcessName 和 CalendarDate 分组之后)以获得 TotalScan。这就是我的输出,但我确信会有更好、更有效的方法来做到这一点。有什么想法吗?

sql pyspark apache-spark-sql amazon-redshift
1个回答
0
投票

为此,您必须使用 Window.partitionBy 按“ProcessName”列对数据进行分区,然后按“日期”排序/排序,最后对该分区窗口求和。

from pyspark.sql import Window
from pyspark.sql import functions as F

columns=["ProcessName", "DailyScan", "Date"]
data=[
    ("NewInsurance","8000","04/12/2024"),
    ("InsuranceRenewal","4500","04/12/2024"),
    ("FraudDetection","28","04/12/2024"),
    ("PolicyWithdrawn","100","04/01/2024"),
    ("NewInsurance","2100","04/13/2024"),
    ("NewInsurance","400","04/14/2024"),
    ("InsuranceRenewal","500","04/14/2024"),
    ("InsuranceRenewal","500","04/18/2024"),
    ("NewInsurance","500","04/18/2024"),
]

df = spark.createDataFrame(data, columns)

df = df.withColumn("Date", F.to_date(df.Date, "MM/dd/yyyy"))

w = Window.partitionBy("ProcessName").orderBy("Date")

df = df.withColumn("DailyScan", F.sum("DailyScan").over(w))

输出:

+----------------+---------+----------+
|ProcessName     |DailyScan|Date      |
+----------------+---------+----------+
|FraudDetection  |28.0     |2024-04-12|
|InsuranceRenewal|4500.0   |2024-04-12|
|InsuranceRenewal|5000.0   |2024-04-14|
|InsuranceRenewal|5500.0   |2024-04-18|
|NewInsurance    |8000.0   |2024-04-12|
|NewInsurance    |10100.0  |2024-04-13|
|NewInsurance    |10500.0  |2024-04-14|
|NewInsurance    |11000.0  |2024-04-18|
|PolicyWithdrawn |100.0    |2024-04-01|
+----------------+---------+----------+
© www.soinside.com 2019 - 2024. All rights reserved.