我有2个Pyspark数据框df1,df2。 df1和df2都包含数百万条记录。
df1就像:
+-------------------+--------+--------+
| name|state | pincode|
+-------------------+--------+--------+
| CYBEX INTERNATION| HOUSTON| 00530 |
| FLUID POWER| MEDWAY | 02053 |
| REFINERY SYSTEMS| FRANCE | 072234 |
| K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+--------+--------+
df2就像:
+--------------------+--------+--------+
| name |state | pincode|
+--------------------+--------+--------+
|FLUID POWER PVT LTD | MEDWAY | 02053 |
| CYBEX INTERNATION | HOUSTON| 02356 |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+--------+--------+
所以,我要根据名称状态和代码来检查是否在df2上找到了df1,并且密码和输出应该在经过验证的情况下(例如,发现行将为1,否则为0,而df将为]
+-------------------+--------+--------+--------- --+
| name|state | pincode| Validated |
+-------------------+--------+--------+---------- -+
| CYBEX INTERNATION| HOUSTON| 00530 | 0 |
| FLUID POWER| MEDWAY | 02053 | 1 |
| REFINERY SYSTEMS| FRANCE | 072234 | 0 |
| K N ENTERPRISES| MUMBAI | 100010 | 0 |
+-------------------+--------+--------+------------+
在df1 Pincode第1行的第一种情况下,与任何df2 Pincode列都不匹配,因此已验证= 0在df1 Pincode第2行的第二种情况下匹配,状态也匹配,并且对于name列,我正在使用Levenshtein来匹配列名,并且最后一行经过验证= 1在第三行Pincode匹配中,但状态不匹配且已验证= 0在第4个Pincode中不存在,并且已验证= 0
我在迭代嵌套if的数据时尝试使用Pandas dataFrame进行了尝试,但是数据是如此之大,迭代不是一个好选择。
我期望使用pyspark并利用类似的并行处理来加快处理过程:
df_final = df1.withColumn('validated', if some_expression == True THEN 1,ELSE 0)
但是无法找出some_expression,以及如何在给定的列且没有任何迭代的情况下在另一个df2上检查整个df1的验证。
我经历了许多火花问题和类似问题,但没有一个对我有帮助。任何帮助将不胜感激。如果有任何不清楚的信息,请发表评论。
您需要通过以下方式解决此问题>>
这里是实现方法:
from pyspark.sql import functions as f
df1 = df1.withColumn("Validated", f.lit(1))
df2 = df2.join(df1.select(*["name", "state", "Validated"]),
on=["name", "state"], how="left")
df2 = df2.fillna(0, subset=["Validated"])