在Pyspark中使用contains和udf的问题:AttributeError:'NoneType'对象没有属性'lower'

问题描述 投票:0回答:1

我有2个数据框,df1和df2:

df1:

+-------------------+----------+------------+
|         df1.name  |df1.state | df1.pincode|
+-------------------+----------+------------+
|  CYBEX INTERNATION| HOUSTON  | 00530      |
|        FLUID POWER| MEDWAY   | 02053      |
|   REFINERY SYSTEMS| FRANCE   | 072234     |
|    K N ENTERPRISES| MUMBAI   | 100010     |
+-------------------+----------+------------+

df2:

+--------------------+------------+------------+
|           df2.name |df2.state   | df2.pincode|
+--------------------+------------+------------+
|FLUID POWER PVT LTD | MEDWAY     | 02053      |
|  CYBEX INTERNATION | HOUSTON    | 02356      |
|REFINERY SYSTEMS LTD| MUMBAI     | 072234     |
+--------------------+------------+------------+

我的工作是验证df2中是否存在df1中的数据,如果它确实有效= 1,否则有效= 0。现在,我在条件,状态和Pincode上运行一些联接操作,并且为了进行字符串比较,我首先将字符串转换为小写字母,然后进行排序并使用Python序列匹配。预期输出为:

+-------------------+-------------------+----------+------------+------------+
|           df1.name|df2.name           |df1.state | df1.pincode|  Validated |
+-------------------+-------------------+----------+------------+------------+
|  CYBEX INTERNATION| NULL              |HOUSTON   | 00530      |     0      |
|        FLUID POWER|FLUID POWER PVT LTD|MEDWAY    | 02053      |     1      |
|   REFINERY SYSTEMS| NULL              |FRANCE    | 072234     |     0      |
|    K N ENTERPRISES| NULL              |MUMBAI    | 100010     |     0      |
+-------------------+-------------------+----------+------------+------------+

我有我的代码:

from pyspark.sql.types import *
from difflib import SequenceMatcher
from pyspark.sql.functions import col,when,lit,udf

contains = udf(lambda s, q: SequenceMatcher(None,"".join(sorted(s.lower())), "".join(sorted(q.lower()))).ratio()>=0.9, BooleanType())
join_condition = ((col("df1.pincode") == col("df2.pincode")) & (col("df1.state") == col("df2.state")))
result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left").where(contains(col("df1.name"), col("df2.name")))
result = result_df.select("df1.*",when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))
result.show()

但是输出给了我AttributeError:'NoneType'对象没有属性'lower']我知道不匹配的列为Null,这就是为什么s.lower()和p.lower()无法正常工作,而是如何解决此问题的原因。我只希望包含这个条件,然后执行过滤过程。

而且,我需要在结果中包含df2.name列,因为我在列表中提供了col名称:

cols = ["df1.name","df2.name","df1.state","df1.pincode"]
result = result_df.select(*cols,when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))

但是我又收到一个错误:SyntaxError:仅命名参数可以跟随* expression

任何帮助将不胜感激。谢谢。

apache-spark pyspark pyspark-sql pyspark-dataframes
1个回答
0
投票

在UDF中,您正在使用.lower方法。此方法是str对象的方法。显然,在您的数据框中,您在df1.name中的某些位置None值。

用这样的东西替换当前的UDF以处理None:

contains = udf(
    lambda s, q: SequenceMatcher(
        None,
        "".join(sorted((s or "").lower())), 
        "".join(sorted((q or "").lower()))
    ).ratio()>=0.9, BooleanType()
)
© www.soinside.com 2019 - 2024. All rights reserved.