从 PySpark RDD 中删除重复的元组对

问题描述 投票:0回答:2

我得到了一个rdd。例子: 测试 = sc.parallelize([(1,0), (2,0), (3,0)])

我需要获取笛卡尔积并删除具有重复条目的结果元组对。 在这个玩具示例中,这些将是 ((1, 0), (1, 0)), ((2, 0), (2, 0)), ((3, 0), (3, 0))。

我可以按如下方式获得笛卡尔积:注意收集和打印语句仅适用于 故障排除。

def compute_cartesian(rdd):
    result1 = sc.parallelize(sorted(rdd.cartesian(rdd).collect()))
    print(type(result1))
    print(result1.collect())

我在这个阶段的类型和输出是正确的:

<class 'pyspark.rdd.RDD'>
[((1, 0), (1, 0)), ((1, 0), (2, 0)), ((1, 0), (3, 0)), ((2, 0), (1, 0)), ((2, 0), (2, 0)), ((2, 0), (3, 0)), ((3, 0), (1, 0)), ((3, 0), (2, 0)), ((3, 0), (3, 0))]

但是现在我需要删除三对具有重复条目的元组。

到目前为止尝试过:

  1. .distinct() 运行但不会产生正确的结果 rdd。
  2. .dropDuplicates() 不会运行。我认为这是 .dropDuplicates() 的错误用法。
  3. 手动功能:

如果没有 RDD,这个任务很容易。

# Remove duplicates
for elem in result:
    if elem[0] == elem[1]:
        result.remove(elem)
print(result)
print("After: ", len(result))

这是我编写的一个函数,它删除重复的元组对,然后吐出结果 len,以便我可以进行健全性检查。

我只是不确定如何直接对 RDD 执行操作,在这种情况下删除笛卡尔积产生的任何重复元组对,并返回一个 RDD。

是的,我可以 .collect() 它,执行操作,然后将其重新键入为 RDD,但这达不到目的。假设这是数十亿对。我需要对rdd执行操作并返回一个rdd。

python-3.x apache-spark pyspark rdd
2个回答
1
投票

您可以使用

filter
删除您不想要的对:

dd.cartesian(rdd).filter(lambda x: x[0] != x[1])

请注意,我不会将这些对称为“重复对”,而是“重复对”,甚至更好,“对角线对”:如果您以几何方式可视化笛卡尔积,它们对应于对角线。

这就是为什么

distinct
dropDuplicates
在这里不合适:它们会删除重复项,这不是您想要的。例如,
[1,1,2].distinct()
[1,2]


0
投票
pairs = rdd.flatMap(lambda x:  [(x[0],y) for y in x[1:]])
© www.soinside.com 2019 - 2024. All rights reserved.