加入两个流水线RDD

问题描述 投票:0回答:1

我正在尝试在pyspart jupyter笔记本中使用.join()加入两个流水线的RDD

第一个RDD:

primaryType.take(5)

['DECEPTIVE PRACTICE',
 'CRIM SEXUAL ASSAULT',
 'BURGLARY',
 'THEFT',
 'CRIM SEXUAL ASSAULT']

第二个RDD:

districts.take(5)
['004', '022', '008', '003', '001']

加入RDD:

rdd_joined = primaryType.join(districts)
rdd_joined.take(5)

输出:

[]

我在这里错了什么?

pyspark jupyter-notebook bigdata rdd
1个回答
0
投票

应该有一些唯一的键来同时连接两个rdds,所以请使用rdd.zipWithIndex()为两个rdds创建索引,然后尝试将它们连接在一起

districts.take(5)
[(0, '004'), (1, '022'), (2, '008'), (3, '003'), (4, '001')]

primaryType.take(5)

['DECEPTIVE PRACTICE',
 'CRIM SEXUAL ASSAULT',
 'BURGLARY',
 'THEFT',
 'CRIM SEXUAL ASSAULT']

districts=districts.zipWithIndex()

districts.take(5)
[('004', 0), ('022', 1), ('008', 2), ('003', 3), ('001', 4)]

districts=districts.map(lambda (x,y):(y,x))

primaryType=primaryType.zipWithIndex()
primaryType=primaryType.map(lambda (x,y):(y,x))
primaryType.join(districts).map(lambda (x,y):y).take(5)

[('DECEPTIVE PRACTICE', '004'), ('CRIM SEXUAL ASSAULT', '001'), ('CRIM SEXUAL ASSAULT', '022'), ('BURGLARY', '008'), ('THEFT', '003')]
© www.soinside.com 2019 - 2024. All rights reserved.