在Python中标记初始数据帧内的代码结果

问题描述 投票:0回答:1

目前我正在对包含交易信息的数据库执行计算。这是一个大数据集,消耗大量资源,并且刚刚面临如何使用优化我当前的解决方案的问题。

我的初始数据框如下所示:

Name    ID     ContractDate LoanSum Status
A       ID1    2022-10-10   10      Closed 
A       ID1    2022-10-15   13      Active
A       ID1    2022-10-30   20      Active
B       ID2    2022-11-05   30      Active
C       ID3    2022-12-10   40      Closed
C       ID3    2022-12-12   43      Active
C       ID3    2022-12-19   46      Active
D       ID4    2022-12-10   10      Closed
D       ID4    2022-12-12   30      Active

我必须创建一个数据框,其中包含发放给特定借款人(按 ID 分组)的所有贷款,其中两笔贷款(分配给一个唯一 ID)之间的天数小于 15,发放给一个特定借款人的贷款金额之间的差异为小于或等于 3。

我的解决方案:

from pyspark.sql import functions as f
from pyspark.sql import Window

df = spark.createDataFrame(data).toDF('Name','ID','ContractDate','LoanSum','Status')
df.show()

cols = df.columns
w = Window.partitionBy('ID').orderBy('ContractDate')

new_df = df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
  .withColumn('PreviousLoanSum', f.lag('LoanSum').over(w)) \
  .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) < 15 and LoanSum - PreviousLoanSum <= 3')) \
  .withColumn('Target', f.col('Target') | f.lead('Target').over(w)) \
  .filter('Target == True') \
  .select(cols[0], *cols[1:])

+----+---+------------+-------+------+
|Name| ID|ContractDate|LoanSum|Status|
+----+---+------------+-------+------+
|   A|ID1|  2022-10-10|     10|Closed|
|   A|ID1|  2022-10-15|     13|Active|
|   C|ID3|  2022-12-10|     40|Closed|
|   C|ID3|  2022-12-12|     43|Active|
|   C|ID3|  2022-12-19|     46|Active|
+----+---+------------+-------+------+

如您所见,我的结果存储在单独的表中。我的下一个目标是从初始数据帧“df”中删除数据帧“new_df”,以便处理相关行。

如果我使用这个明显的解决方案,系统运行速度非常慢,尤其是当我必须在每一步中逐一减去数据帧时:

df_sub = df.subtract(new_df)

我的问题:是否有可能(如果是,那么如何)不创建新的数据框,而是创建第一个数据框 df 内的数据框 new_df 中包含的单独行?也许通过创建一个新列以特殊方式标记行,以便过滤稍后进一步分析所需的行?

提前谢谢您!

python pyspark subset data-manipulation
1个回答
0
投票

你可以通过两种方式做到这一点

  1. 使用左反连接
  2. 不要创建另一个表,而是在同一个表中添加您的
    target
    标志

选项 1 -

左反连接 - 保留左表中与右表中没有任何匹配行的行。

new_df = df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
  .withColumn('PreviousLoanSum', f.lag('LoanSum').over(w)) \
  .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) < 15 and LoanSum - PreviousLoanSum <= 3')) \
  .withColumn('RemoverRowsFlag', f.col('Target') | f.lead('Target').over(w)) \
  .filter('RemoverRowsFlag == True') \
  .select(cols[0], *cols[1:],'RemoverRowsFlag')

df = df.join(new_df,on=[*cols], how='left_anti')
df.show()

输出

+----+---+------------+-------+------+
|Name| ID|ContractDate|LoanSum|Status|
+----+---+------------+-------+------+
|   A|ID1|  2022-10-30|     20|Active|
|   B|ID2|  2022-11-05|     30|Active|
|   D|ID4|  2022-12-10|     10|Closed|
|   D|ID4|  2022-12-12|     30|Active|
+----+---+------------+-------+------+

选项 2 -

这非常简单,在同一个表中添加列并过滤它。

df = df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
  .withColumn('PreviousLoanSum', f.lag('LoanSum').over(w)) \
  .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) < 15 and LoanSum - PreviousLoanSum <= 3')) \
  .withColumn('RemoverRowsFlag', f.col('Target') | f.lead('Target').over(w)) \
  .select(*cols,'RemoverRowsFlag')

df = df.filter('RemoverRowsFlag == True')
df.show()

输出

+----+---+------------+-------+------+---------------+
|Name| ID|ContractDate|LoanSum|Status|RemoverRowsFlag|
+----+---+------------+-------+------+---------------+
|   A|ID1|  2022-10-10|     10|Closed|           true|
|   A|ID1|  2022-10-15|     13|Active|           true|
|   C|ID3|  2022-12-10|     40|Closed|           true|
|   C|ID3|  2022-12-12|     43|Active|           true|
|   C|ID3|  2022-12-19|     46|Active|           true|
+----+---+------------+-------+------+---------------+
© www.soinside.com 2019 - 2024. All rights reserved.