如何计算pyspark RDD中每对行中相等值的数量

问题描述 投票:0回答:1

我正在尝试在pyspark中实现LSH,为此我为集合中的每个文档创建了min-hash签名,然后将其划分为带(在这里,我发布了一个简化示例,其中仅2个带,并且签名由5个哈希组成。

我使用了此功能:

signatures = signatures.groupByKey().map(lambda x: (x[0], [elem for elem in x[1].__iter__()])).groupByKey()\
    .map(lambda x: [x[0][1], x[0][0], [elem for elem in x[1].__iter__()][0]]).cache()

该函数返回了此输出:

[1, 1, [31891011288540205849559551829790241508456516432, 28971434183002082500813759681605076406295898007, 84354247191629359011614612371642003229438145118, 14879564999779411946535520329978444194295073263, 28999405396879353085885150485918753398187917441]]
[2, 2, [6236085341917560680351285350168314740288121088, 28971434183002082500813759681605076406295898007, 47263781832612219468430472591505267902435456768, 48215701840864104930367382664962486536872207556, 28999405396879353085885150485918753398187917441]]
[1, 3, [274378016236983705444587880288109426115402687, 120052627645426913871540455290804229381930764767, 113440107283022891200151098422815365240954899060, 95554518001487118601391311753326782629149232562, 84646902172764559093309166129305123869359546269]]
[2, 4, [6236085341917560680351285350168314740288121088, 28971434183002082500813759681605076406295898007, 47263781832612219468430472591505267902435456768, 48215701840864104930367382664962486536872207556, 28999405396879353085885150485918753398187917441]]
[1, 5, [6236085341917560680351285350168314740288121088, 28971434183002082500813759681605076406295898007, 47263781832612219468430472591505267902435456768, 48215701840864104930367382664962486536872207556, 28999405396879353085885150485918753398187917441]]

使用这种方案:[<,,]

现在,我的问题是:如何在pyspark中模拟嵌套的for循环,以针对每对签名(DOCi,DOCj)查找i = \ = j,计算签名共有多少个元素并返回由以下类型的元组组成的集合:

((DOCi,DOCj,签名中共有的元素数)

而且我只能比较具有相同波段编号的元素,我该怎么做?是在pyspark中实施LSH的正确方法吗?

python apache-spark pyspark data-mining lsh
1个回答
0
投票

IIUC,不需要嵌套的for循环,只需在列表理解中使用itertools.combinations:

from itertools import combinations

signatures.groupBy(lambda x: x[0]) \
    .mapValues(lambda x: set([ (y1[1], y2[1], len(set(y1[2]).intersection(set(y2[2])))) for y1,y2 in combinations(x,2) ]))  \
    .collect()                                                   
#[(2, {(2, 4, 5)}), (1, {(1, 3, 0), (1, 5, 2), (3, 5, 0)})]

注意:如果您可以在DataFrame之上构建应用程序,因为Spark 2.0自从Spark 2.0起就将基于RDD的pyspark.mllib引入了maintain mode,那么您可以选中pyspark.ml.feature.MinHashLSH以简化您的任务。

© www.soinside.com 2019 - 2024. All rights reserved.