我试图使用词袋模型构建一个基于内容的推荐系统。我接下来的教程使用 sklearn 库中大小为 (4000,5000) 的向量的余弦相似度,其中 4000 是数据集中的行数,5000 是特征数。
from sklearn.feature_extraction.text import CountVectorizer
cv = CountVectorizer(max_features=5000, stop_words='english')
vectors = cv.fit_transform(new_df['tags']).toarray()
// here new_df is the dataframe and new_df[tags] contain all the tags (eg: location, genre) based on which recommendation will be performed
但是当我尝试在另一个具有 94955 行的数据集上实现余弦相似度(这会产生大小为 (94955, 5000) 的向量时,我收到以下错误
MemoryError: Unable to allocate 67.2 GiB for an array with shape (94955, 94955) and data type float64
上线
similarity = cosine_similarity(vectors, dense_output=False)
有没有办法实现余弦相似度的批处理,以便我可以克服这个问题,或者我应该改变算法?
sklearn.metrics.pairwise.cosine_similarity()
计算 vectors
中所有样本之间的成对相似度并返回形状数组 (94955, 94955)。每次计算都是在 5000 维向量上执行的。这对于单台机器来说内存成本太高,并且可能无法垂直扩展。
这个问题可以通过水平缩放来解决。您可以使用 Spark 来实现此目的。
我们有类似的要求来计算大量数据的成对相似度。我为此编写了一个矢量化余弦相似度计算代码。花了几个小时;但就我而言,最终完成了。与 1000 维向量进行了 75,000 x 75,000 次相似性比较。
这个想法是将数据帧转换为大量(a,b)对,并对列应用以下向量化操作。 Spark 将在不同的工作机器上分配对。
首先,使用
pyspark.ml.feature.VectorAssembler
将特征转换为两个向量列。这两个向量将用于计算成对相似度。
from pyspark.ml.feature import VectorAssembler
feature_col_names = [...] # TODO - collect your feature column names
assembler = VectorAssembler(inputCols=feature_col_names, outputCol="features_vec_1")
df = assembler.transform(df)
from pyspark.sql.functions import col
df = df.withColumn("features_vec_2", col("features_vec_1"))
计算两个向量列的交叉连接以获得成对相似性:
df = df.select("features_vec_1").crossJoin(df.select("features_vec_2"))
应用以下矢量化列运算来计算余弦相似度:
import pyspark.sql.functions as fn
df = df.withColumn("axb", fn.zip_with("features_vec_1", "features_vec_2", lambda x, y: x * y))
df = df.withColumn("axa", fn.zip_with("features_vec_1", "features_vec_1", lambda x, y: x * y))
df = df.withColumn("bxb", fn.zip_with("features_vec_2", "features_vec_2", lambda x, y: x * y))
df = df.withColumn("dot_prod", fn.expr("AGGREGATE(axb, DOUBLE(0), (a,x)->a+x)"))
df = df.withColumn("norm_1", fn.expr("sqrt(AGGREGATE(axa, DOUBLE(0), (a,x)->a+x))"))
df = df.withColumn("norm_2", fn.expr("sqrt(AGGREGATE(bxb, DOUBLE(0), (a,x)->a+x))"))
df = df.withColumn("cosine_similarity", fn.lit(fn.col("dot_prod") / (fn.col("norm_1") * fn.col("norm_2"))))
df = df.drop("axb", "axa", "bxb", "dot_prod", "norm_1", "norm_2")
您可能需要应用一些终止操作(如
collect()
或df.write
)来触发上述计算。
df.write.mode("overwrite").parquet("/file/path")
df = sparkSession.read.parquet("/file/path")
OR
similarity_list = df.collect()
此时,您已经按照以下成对方式计算了余弦相似度:
+--------------+--------------+-----------------+
|features_vec_1|features_vec_2|cosine_similarity|
+--------------+--------------+-----------------+
| row_1| row_1| 1.0|
| row_1| row_2| 0.5|
| row_1| ... | |
| row_1| row_94955| 0.7|
| row_2| row_1| 0.5|
| row_2| row_2| 1.0|
| row_2| ... | |
| row_2| row_94955| 0.2|
| row_94955| row_1| 0.7|
| row_94955| row_2| 0.2|
| row_94955| ... | |
| row_94955| row_94955| 1.0|
+--------------+--------------+-----------------+