我有一个PySpark数据框,其中第二列是列表列表。
以下是我拥有的PySpark数据框:
+---+------------------------------+
|A |B |
+---+------------------------------+
|a |[[95.0], [25.0, 25.0], [40.0]]|
|a |[[95.0], [20.0, 80.0]] |
|a |[[95.0], [25.0, 75.0]] |
|b |[[95.0], [25.0, 75.0]] |
|b |[[95.0], [12.0, 88.0]] |
+---+------------------------------+
在此示例中,我试图展平数组(在第二列中),对数组进行排序并删除随后的numpy数组中的最大元素。
以下是我期望的输出:
+---+------------------------------+
|A |B |
+---+------------------------------+
|a |[25.0, 25.0, 40.0] |
|a |[20.0, 80.0] |
|a |[25.0, 75.0] |
|b |[25.0, 75.0] |
|b |[12.0, 88.0] |
+---+------------------------------+
下面是我目前拥有的udf:
def remove_highest(col):
return np.sort( np.asarray([item for sublist in col for item in sublist]) )[:-1]
udf_remove_highest = F.udf( remove_highest , T.ArrayType() )
当我尝试创建此udf时收到以下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-20-6984c2f41293> in <module>()
2 return np.sort( np.asarray([item for sublist in col for item in sublist]) )[:-1]
3
----> 4 udf_remove_highest = F.udf( remove_highest , T.ArrayType() )
TypeError: __init__() missing 1 required positional argument: 'elementType'
[如果可能,我希望使用numpy数组的udf。我如何实现上述目标?
我不推荐UDF,但是如果您想让代码工作,只需在ArrayType内添加FloatType
udf_remove_highest = F.udf( remove_highest , T.ArrayType(T.FloatType()) )
如果保留重复对您而言并不重要,那么您可以使用此代码,该代码使用pyspark函数,并且会更快,更高效。
将为每行动态删除最大值。
df1=df.withColumn("B",F.flatten("B")).withColumn("B", F.sort_array("B")).withColumn("Max",F.array((F.array_max("B"))))
df1.withColumn("B",F.array_except("B","Max")).drop("Max").show()
+---+------------+
| A| B|
+---+------------+
| a|[25.0, 40.0]|
| a|[20.0, 80.0]|
| a|[25.0, 75.0]|
| a|[25.0, 75.0]|
| a|[12.0, 88.0]|
+---+------------+