我正在尝试基于广播变量过滤大型RDD。
我能够执行以下操作,过滤作为广播变量中的键存在的元组。
nested_filter = {"india":'ind',"usa":'us'}
b_filter = sc.broadcast(nested_filter)
rdd_set = sc.parallelize([('india','ind'),('india', 'nope') , ('usa','us'),
('japan','jpn'),('uruguay','urg')])
过滤:
rdd_set.filter(lambda fields: fields[0] in b_filter.value).collect()
返回:
[('india', 'ind'), ('india', 'nope'), ('usa', 'us')]
我的问题是我想过滤广播词典中的键以及与键相关的值。
正确的结果应该是:
[('india', 'ind'), ('usa', 'us')]
实际的RDD将有数十亿行,其中包含数百万个密钥的广播字典。有人可以告诉我最有效的方法吗?
您可以使用items()
从字典中获取键值元组列表。然后检查该列表中是否有一行:
rdd_set.filter(lambda fields: fields in b_filter.value.items()).collect()
#[('india', 'ind'), ('usa', 'us')]