免责声明这个问题是一个线程的一部分,包括这两个SO问题(q1,q2)
数据类似于 ml-latest 数据集的 ratings.csv file(~891mb)中的电影评级。
一旦我使用
polars
库读取 csv 文件,例如:
movie_ratings = pl.read_csv(os.path.join(application_path + data_directory, "ratings.csv"))
假设我们要计算用户=1(例如 62 部电影)观看的电影与数据集中其余电影之间的相似度。仅供参考,该数据集有大约 83,000 部电影,因此对于 every other_movie (82,938) 计算与用户 1 观看的每部电影(62 部电影)的相似度。复杂度为 62x82938(迭代)。
在此示例中,报告的基准仅适用于 400/82,938
other_movies
为此,我创建了两个
polars
数据框。第一个数据帧包含 other_movies
(约 82,938 行),第二个数据帧仅包含用户看过的电影(62 行)。
user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1 (data related to user 1)
user_rated_movies = list(user_ratings.select(pl.col("movieId")).to_numpy().ravel()) #movies seen by user1
potential_movies_to_recommend = list(
movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
movie_ratings.filter(
~pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
)
.group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
target_items_metadata = (
movie_ratings.filter(
pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
).group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
结果是两个
polars
数据框,其中包含行(电影)和列(用户观看的电影以及每个用户的评分)。
第一个数据帧仅包含
other_movies
,我们可以向用户 1 推荐他/她没有见过的内容。
第二个数据帧仅包含用户看过的电影。
接下来我的方法是通过应用 UDF 函数迭代第一个数据帧的每一行。
item_metadata_similarity = (
items_metadata.with_columns(
similarity_score=pl.struct(pl.all()).map_elements(
lambda row: item_compute_similarity_scoring_V2(row, similarity_metric, target_items_metadata),
return_dtype=pl.List(pl.List(pl.Float64)),
strategy="threading"
)
)
)
,其中
item_compute_similarity_scoring_V2
定义为:
def item_compute_similarity_scoring_V2(
row,
target_movies_metadata:pl.DataFrame
):
users_item1 = np.asarray(row["users_seen_movie"])
ratings_item1 = np.asarray(row["user_ratings"])
computed_similarity: list=[]
for row2 in target_movies_metadata.iter_rows(named=True): #iter over each row from the second dataframe with the movies seen by the user.
users_item2=np.asarray(row2["users_seen_movie"])
ratings_item2=np.asarray(row2["user_ratings"])
r1, r2 = item_ratings(users_item1, ratings_item1, users_item2, ratings_item2)
if r1.shape[0] != 0 and r2.shape[0] != 0:
similarity_score = compute_similarity_score(r1, r2)
if similarity_score > 0.0: #filter out negative or zero similarity scores
computed_similarity.append((row2["movieId"], similarity_score))
most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)
return most_similar_pairs
、
item_ratings
和 compute_similarity_score
定义为
def item_ratings(u1:np.ndarray, r1:np.ndarray, u2:np.ndarray, r2:np.ndarray) -> (np.ndarray, np.ndarray):
common_elements, indices1, indices2 = np.intersect1d(u1, u2, return_indices=True)
sr1 = r1[indices1]
sr2 = r2[indices2]
assert len(sr1)==len(sr2), "ratings don't have same lengths"
return sr1, sr2
@jit(nopython=True, parallel=True)
def compute_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
assert(array1.shape[0] == array2.shape[0])
a1a2 = 0
a1a1 = 0
a2a2 = 0
for i in range(array1.shape[0]):
a1a2 += array1[i]*array2[i]
a1a1 += array1[i]*array1[i]
a2a2 += array2[i]*array2[i]
cos_theta = 1.0
if a1a1!=0 and a2a2!=0:
cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
return cos_theta
该函数基本上迭代第二个数据帧的每一行,并为每一行计算
other_movie
与用户观看的电影之间的相似度。因此,对于
我们对 400 部电影进行 400*62 次迭代,每个other_movie
生成 62 个相似度分数。
每次计算的结果都是一个具有模式
[[1, 0.20], [110, 0.34]]...
的数组(每个 other_movie
长度 62 对)
400 部电影的基准
我想确定如何通过使用本机
polars
命令或利用 numba
并行框架来改进计算。
to_numpy()
操作,没有 iter_rows()
和 map_elements()
user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1
user_rated_movies = user_ratings.select(pl.col("movieId")).to_numpy().ravel()
potential_movies_to_recommend = list(
movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
movie_ratings.filter(
~pl.col("movieId").is_in(user_rated_movies)
)
)
# print(items_metadata.head(5))
target_items_metadata = (
movie_ratings.filter(
pl.col("movieId").is_in(user_rated_movies)
)
)
# print(target_items_metadata.head(5))
采用第二种方法,
items_metadata
和target_items_metadata
是两个大型极坐标表。
然后我的下一步是使用
numpy.ndarrays
命令将两个表保存到 to_numpy()
中。
items_metadata_array = items_metadata.to_numpy()
target_items_metadata_array = target_items_metadata.to_numpy()
computed_similarity_scores:dict = {}
for i, other_movie in enumerate(potential_movies_to_recommend[:400]): #take the first 400 unseen movies by user 1
mask = items_metadata_array[:, 1] == other_movie
other_movies_chunk = items_metadata_array[mask]
u1 = other_movies_chunk[:,0].astype(np.int32)
r1 = other_movies_chunk[:,2].astype(np.float32)
computed_similarity: list=[]
for i, user_movie in enumerate(user_rated_movies):
print(user_movie)
mask = target_items_metadata_array[:, 1] == user_movie
target_movie_chunk = target_items_metadata_array[mask]
u2 = target_movie_chunk[:,0].astype(np.int32)
r2 = target_movie_chunk[:,2].astype(np.float32)
common_r1, common_r2 = item_ratings(u1, r1, u2, r2)
if common_r1.shape[0] != 0 and common_r2.shape[0] != 0:
similarity_score = compute_similarity_score(common_r1, common_r2)
if similarity_score > 0.0:
computed_similarity.append((user_movie, similarity_score))
most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
computed_similarity_scores[str(other_movie)] = most_similar_pairs
第二种进场的基准(8.50分钟>第一种进场的6分钟)
这是当前方法的直接翻译(即它仍然包含“双 for 循环”),因此它只是作为“原生 Polars”版本的基线尝试。
因为它使用 Lazy API,所以循环中不会计算任何内容。
这一切都在调用
.collect()
时完成(这允许 Polars 并行化工作)。
收集结果后将完成
> 0.0
相似度得分的过滤。
input_id = 1
is_user_rating = pl.col("userId") == input_id
can_recommend = (
pl.col("movieId").is_in(pl.col("movieId").filter(is_user_rating)).not_()
)
cosine_similarity = (
pl.col('rating').dot('rating_right') /
( pl.col('rating').pow(2).sum().sqrt() *
pl.col('rating_right').pow(2).sum().sqrt() )
)
user_rated_movies = movie_ratings.filter(is_user_rating).select("movieId").to_series()
potential_movies_to_recommend = (
movie_ratings.filter(can_recommend).select(pl.col("movieId").unique().sort())
)
# use the Lazy API so we can compute in parallel
df = movie_ratings.lazy()
computed_similarity_scores = []
for other_movie in potential_movies_to_recommend.head(1).to_series(): # .head(N) potential movies
for user_movie in user_rated_movies:
score = (
df.filter(pl.col("movieId") == user_movie)
.join(
df.filter(pl.col("movieId") == other_movie),
on = "userId"
)
.select(cosine = cosine_similarity)
.select(user_movie=user_movie, other_movie=other_movie, similarity_score="cosine")
)
computed_similarity_scores.append(score)
# All scores are computed in parallel
computed_similarity_scores_polars = pl.concat(computed_similarity_scores).collect()
shape: (62, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ --- ┆ --- ┆ --- │
│ i32 ┆ i32 ┆ f64 │
╞════════════╪═════════════╪══════════════════╡
│ 1 ┆ 2 ┆ 0.95669 │
│ 110 ┆ 2 ┆ 0.950086 │
│ 158 ┆ 2 ┆ 0.957631 │
│ 260 ┆ 2 ┆ 0.945542 │
│ … ┆ … ┆ … │
│ 49647 ┆ 2 ┆ 0.9411 │
│ 52458 ┆ 2 ┆ 0.955353 │
│ 53996 ┆ 2 ┆ 0.930388 │
│ 54259 ┆ 2 ┆ 0.95469 │
└────────────┴─────────────┴──────────────────┘
测试
.head(100)
对于您的示例,我得到 58s
运行时间与 111s
运行时间,内存消耗是相同的。
结果是
72_602
行。
如果我删除内部 for 循环和第一个过滤器,运行时间是
20s
但我最终会得到 9_346_132
行。
72_602
行都存在于较大的结果中,但如果删除第一个过滤器适用于较大的用户,则我不会。
userId=189614
好像是最大的,他们所有的电影都是一个路口的一部分,他们有33_332
。