我有2个数据框,df1和df2:
df1:
+-------------------+----------+------------+
| df1.name |df1.state | df1.pincode|
+-------------------+----------+------------+
| CYBEX INTERNATION| HOUSTON | 00530 |
| FLUID POWER| MEDWAY | 02053 |
| REFINERY SYSTEMS| FRANCE | 072234 |
| K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+----------+------------+
df2:
+--------------------+------------+------------+
| df2.name |df2.state | df2.pincode|
+--------------------+------------+------------+
|FLUID POWER PVT LTD | MEDWAY | 02053 |
| CYBEX INTERNATION | HOUSTON | 02356 |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+------------+------------+
我的工作是验证df2中是否存在df1中的数据,如果它确实有效= 1,否则有效= 0。现在,我在条件,状态和Pincode上运行一些联接操作,并且为了进行字符串比较,我首先将字符串转换为小写字母,然后进行排序并使用Python序列匹配。预期输出为:
+-------------------+-------------------+----------+------------+------------+
| df1.name|df2.name |df1.state | df1.pincode| Validated |
+-------------------+-------------------+----------+------------+------------+
| CYBEX INTERNATION| NULL |HOUSTON | 00530 | 0 |
| FLUID POWER|FLUID POWER PVT LTD|MEDWAY | 02053 | 1 |
| REFINERY SYSTEMS| NULL |FRANCE | 072234 | 0 |
| K N ENTERPRISES| NULL |MUMBAI | 100010 | 0 |
+-------------------+-------------------+----------+------------+------------+
我有我的代码:
from pyspark.sql.types import *
from difflib import SequenceMatcher
from pyspark.sql.functions import col,when,lit,udf
contains = udf(lambda s, q: SequenceMatcher(None,"".join(sorted(s.lower())), "".join(sorted(q.lower()))).ratio()>=0.9, BooleanType())
join_condition = ((col("df1.pincode") == col("df2.pincode")) & (col("df1.state") == col("df2.state")))
result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left").where(contains(col("df1.name"), col("df2.name")))
result = result_df.select("df1.*",when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))
result.show()
但是输出给了我AttributeError:'NoneType'对象没有属性'lower']我知道不匹配的列为Null,这就是为什么s.lower()和p.lower()无法正常工作,而是如何解决此问题的原因。我只希望包含这个条件,然后执行过滤过程。
而且,我需要在结果中包含df2.name列,因为我在列表中提供了col名称:
cols = ["df1.name","df2.name","df1.state","df1.pincode"]
result = result_df.select(*cols,when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))
但是我又收到一个错误:SyntaxError:仅命名参数可以跟随* expression
任何帮助将不胜感激。谢谢。
在UDF中,您正在使用.lower
方法。此方法是str
对象的方法。显然,在您的数据框中,您在df1.name
中的某些位置None
值。
用这样的东西替换当前的UDF以处理None:
contains = udf(
lambda s, q: SequenceMatcher(
None,
"".join(sorted((s or "").lower())),
"".join(sorted((q or "").lower()))
).ratio()>=0.9, BooleanType()
)