使用 numba、polars 和 numpy 高效计算项目协作过滤相似度

问题描述 投票:0回答:1

免责声明这个问题是一个线程的一部分,包括这两个SO问题(q1q2

数据类似于 ml-latest 数据集的 ratings.csv file(~891mb)中的电影评级。

一旦我使用

polars
库读取 csv 文件,例如:

movie_ratings = pl.read_csv(os.path.join(application_path + data_directory, "ratings.csv"))

假设我们要计算用户=1(例如 62 部电影)观看的电影与数据集中其余电影之间的相似度。仅供参考,该数据集有大约 83,000 部电影,因此对于 every other_movie (82,938) 计算与用户 1 观看的每部电影(62 部电影)的相似度。复杂度为 62x82938(迭代)。

在此示例中,报告的基准仅适用于 400/82,938

other_movies

为此,我创建了两个

polars
数据框。第一个数据帧包含
other_movies
(约 82,938 行),第二个数据帧仅包含用户看过的电影(62 行)。

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1 (data related to user 1)
user_rated_movies = list(user_ratings.select(pl.col("movieId")).to_numpy().ravel()) #movies seen by user1
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)

items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    )
    .group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    ).group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

结果是两个

polars
数据框,其中包含行(电影)和列(用户观看的电影以及每个用户的评分)。

第一个数据帧仅包含

other_movies
,我们可以向用户 1 推荐他/她没有见过的内容。

第二个数据帧仅包含用户看过的电影。

接下来我的方法是通过应用 UDF 函数迭代第一个数据帧的每一行。

item_metadata_similarity = (
    items_metadata.with_columns(
        similarity_score=pl.struct(pl.all()).map_elements(
            lambda row: item_compute_similarity_scoring_V2(row, similarity_metric, target_items_metadata),
            return_dtype=pl.List(pl.List(pl.Float64)),
            strategy="threading"
        )
    )
)

,其中

item_compute_similarity_scoring_V2
定义为:

def item_compute_similarity_scoring_V2(
    row,
    target_movies_metadata:pl.DataFrame
):
    users_item1 = np.asarray(row["users_seen_movie"])
    ratings_item1 = np.asarray(row["user_ratings"])
    computed_similarity: list=[]
    for row2 in target_movies_metadata.iter_rows(named=True): #iter over each row from the second dataframe with the movies seen by the user.
        users_item2=np.asarray(row2["users_seen_movie"])
        ratings_item2=np.asarray(row2["user_ratings"])
        r1, r2 = item_ratings(users_item1, ratings_item1, users_item2, ratings_item2)
        if r1.shape[0] != 0 and r2.shape[0] != 0:
            similarity_score = compute_similarity_score(r1, r2)
            if similarity_score > 0.0: #filter out negative or zero similarity scores
                computed_similarity.append((row2["movieId"], similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)
    return most_similar_pairs

item_ratings
compute_similarity_score
定义为

def item_ratings(u1:np.ndarray, r1:np.ndarray, u2:np.ndarray, r2:np.ndarray) -> (np.ndarray, np.ndarray):
    common_elements, indices1, indices2 = np.intersect1d(u1, u2, return_indices=True)
    sr1 = r1[indices1]
    sr2 = r2[indices2]
    assert len(sr1)==len(sr2), "ratings don't have same lengths"
    return sr1, sr2

@jit(nopython=True, parallel=True)
def compute_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
    assert(array1.shape[0] == array2.shape[0])
    a1a2 = 0
    a1a1 = 0
    a2a2 = 0
    for i in range(array1.shape[0]):
        a1a2 += array1[i]*array2[i]
        a1a1 += array1[i]*array1[i]
        a2a2 += array2[i]*array2[i]
    cos_theta = 1.0
    if a1a1!=0 and a2a2!=0:
        cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
    return cos_theta

该函数基本上迭代第二个数据帧的每一行,并为每一行计算

other_movie
与用户观看的电影之间的相似度。因此,对于 我们对 400 部电影进行 400*62 次迭代,每个
other_movie
生成 62 个相似度分数。

每次计算的结果都是一个具有模式

[[1, 0.20], [110, 0.34]]...
的数组(每个
other_movie
长度 62 对)

400 部电影的基准

  1. 信息 - 项目-项目:计算 400 部电影的相似度分数:0:01:49.887032
  2. ~2分钟。
  3. ~使用了 5GB RAM。

我想确定如何通过使用本机

polars
命令或利用
numba
并行框架来改进计算。

更新 - 第二种方法使用
to_numpy()
操作,没有
iter_rows()
map_elements()

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1
user_rated_movies = user_ratings.select(pl.col("movieId")).to_numpy().ravel()
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(items_metadata.head(5))
target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(target_items_metadata.head(5))

采用第二种方法,

items_metadata
target_items_metadata
是两个大型极坐标表。

然后我的下一步是使用

numpy.ndarrays
命令将两个表保存到
to_numpy()
中。

items_metadata_array = items_metadata.to_numpy()
target_items_metadata_array = target_items_metadata.to_numpy()
computed_similarity_scores:dict = {}
for i, other_movie in enumerate(potential_movies_to_recommend[:400]): #take the first 400 unseen movies by user 1
    mask = items_metadata_array[:, 1] == other_movie
    other_movies_chunk = items_metadata_array[mask]
    u1 = other_movies_chunk[:,0].astype(np.int32)
    r1 = other_movies_chunk[:,2].astype(np.float32)
    computed_similarity: list=[]
    for i, user_movie in enumerate(user_rated_movies):
        print(user_movie)
        mask = target_items_metadata_array[:, 1] == user_movie
        target_movie_chunk = target_items_metadata_array[mask]
        u2 = target_movie_chunk[:,0].astype(np.int32)
        r2 = target_movie_chunk[:,2].astype(np.float32)
        common_r1, common_r2 = item_ratings(u1, r1, u2, r2)
        if common_r1.shape[0] != 0 and common_r2.shape[0] != 0:
            similarity_score = compute_similarity_score(common_r1, common_r2)
            if similarity_score > 0.0:
                computed_similarity.append((user_movie, similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
    computed_similarity_scores[str(other_movie)] = most_similar_pairs

第二种进场的基准(8.50分钟>第一种进场的6分钟)

  • Item-Item:计算 400 部电影的相似度分数:0:08:50.537102
python dataframe numpy numba python-polars
1个回答
0
投票

这是当前方法的直接翻译(即它仍然包含“双 for 循环”),因此它只是作为“原生 Polars”版本的基线尝试。

因为它使用 Lazy API,所以循环中不会计算任何内容。

这一切都在调用

.collect()
时完成(这允许 Polars 并行化工作)。

收集结果后将完成

> 0.0
相似度得分的过滤。

input_id = 1

is_user_rating = pl.col("userId") == input_id

can_recommend = (
    pl.col("movieId").is_in(pl.col("movieId").filter(is_user_rating)).not_()
)

cosine_similarity = (
    pl.col('rating').dot('rating_right') /  
    ( pl.col('rating').pow(2).sum().sqrt() * 
      pl.col('rating_right').pow(2).sum().sqrt() ) 
)

user_rated_movies = movie_ratings.filter(is_user_rating).select("movieId").to_series()

potential_movies_to_recommend = (
    movie_ratings.filter(can_recommend).select(pl.col("movieId").unique().sort())
)

# use the Lazy API so we can compute in parallel
df = movie_ratings.lazy()

computed_similarity_scores = []
for other_movie in potential_movies_to_recommend.head(1).to_series(): # .head(N) potential movies
    for user_movie in user_rated_movies:
        score = (
            df.filter(pl.col("movieId") == user_movie)
              .join(
                 df.filter(pl.col("movieId") == other_movie),
                 on = "userId"
              )
              .select(cosine = cosine_similarity)
              .select(user_movie=user_movie, other_movie=other_movie, similarity_score="cosine")
        )
        computed_similarity_scores.append(score)
        
# All scores are computed in parallel
computed_similarity_scores_polars = pl.concat(computed_similarity_scores).collect()
shape: (62, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ ---        ┆ ---         ┆ ---              │
│ i32        ┆ i32         ┆ f64              │
╞════════════╪═════════════╪══════════════════╡
│ 1          ┆ 2           ┆ 0.95669          │
│ 110        ┆ 2           ┆ 0.950086         │
│ 158        ┆ 2           ┆ 0.957631         │
│ 260        ┆ 2           ┆ 0.945542         │
│ …          ┆ …           ┆ …                │
│ 49647      ┆ 2           ┆ 0.9411           │
│ 52458      ┆ 2           ┆ 0.955353         │
│ 53996      ┆ 2           ┆ 0.930388         │
│ 54259      ┆ 2           ┆ 0.95469          │
└────────────┴─────────────┴──────────────────┘

测试

.head(100)
对于您的示例,我得到
58s
运行时间与
111s
运行时间,内存消耗是相同的。

结果是

72_602
行。

如果我删除内部 for 循环和第一个过滤器,运行时间是

20s
但我最终会得到
9_346_132
行。

72_602
行都存在于较大的结果中,但如果删除第一个过滤器适用于较大的用户,则我不会。

userId=189614
好像是最大的,他们所有的电影都是一个路口的一部分,他们有
33_332

© www.soinside.com 2019 - 2024. All rights reserved.