我正在研究一个Spark集群,并且我有两个数据框。一个包含文本。另一个是查询表。两个表都很大(M和N都可以轻易超过100,000个条目)。匹配它们的最佳方法是什么?
进行交叉联接然后基于匹配项过滤结果似乎是个疯狂的主意,因为我肯定会耗尽内存。
我的数据框看起来像这样:
df1:
text
0 i like apples
1 oranges are good
2 eating bananas is healthy
. ...
. ...
M tomatoes are red, bananas are yellow
df2:
fruit_lookup
0 apples
1 oranges
2 bananas
. ...
. ...
N tomatoes
我期望输出数据帧看起来像这样:
output_df:
text extracted_fruits
0 i like apples ['apples']
1 oranges are good ['oranges']
2 eating bananas is healthy ['bananas']
. ...
. ...
M tomatoes are red, bananas are yellow . ['tomatoes','bananas']
一种方法是使用CountVectorizerModel,因为此模型应可管理100K个查找字(默认vocabSize = 262144:
)基本思想是基于df2
(查找表)中的自定义列表创建CountVectorizerModel。将text
中的df1
拆分为一个数组列(或使用pyspark.ml.feature.Tokenizer),然后转换此列:
from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.functions import split, udf
df2.show()
#+---+------------+
#| id|fruit_lookup|
#+---+------------+
#| 0| apples|
#| 1| oranges|
#| 2| bananas|
#| 3| tomatoes|
#+---+------------+
lst = [ r.fruit_lookup for r in df2.collect() ]
model = CountVectorizerModel.from_vocabulary(lst, inputCol='words_arr', outputCol='fruits_vec')
df3 = model.transform(df1.withColumn('words_arr', split('text', '\s+')))
df3.show(truncate=False)
#+------------------------------------+-------------------------------------------+-------------------+
#|text |words_arr |fruits_vec |
#+------------------------------------+-------------------------------------------+-------------------+
#|I like apples |[I, like, apples] |(4,[0],[1.0]) |
#|oranges are good |[oranges, are, good] |(4,[1],[1.0]) |
#|eating bananas is healthy |[eating, bananas, is, healthy] |(4,[2],[1.0]) |
#|tomatoes are red, bananas are yellow|[tomatoes, are, red,, bananas, are, yellow]|(4,[2,3],[1.0,1.0])|
#+------------------------------------+-------------------------------------------+-------------------+
然后您可以使用model.vocabulary
将Fruits_vec映射回水果。>
vocabulary = model.vocabulary
#['apples', 'oranges', 'bananas', 'tomatoes']
to_match = udf(lambda v: [ vocabulary[i] for i in v.indices ], 'array<string>')
df_new = df3.withColumn('extracted_fruits', to_match('fruits_vec')).drop('words_arr', 'fruits_vec')
df_new.show(truncate=False)
#+------------------------------------+-------------------+
#|text |extracted_fruits |
#+------------------------------------+-------------------+
#|I like apples |[apples] |
#|oranges are good |[oranges] |
#|eating bananas is healthy |[bananas] |
#|tomatoes are red, bananas are yellow|[bananas, tomatoes]|
#+------------------------------------+-------------------+