pyspark RDD 计算 DAG 中的节点数

问题描述 投票:0回答:1

我有 RDD,显示为

["2\t{'3': 1}",
 "3\t{'2': 2}",
 "4\t{'1': 1, '2': 1}",
 "5\t{'4': 3, '2': 1, '6': 1}",
 "6\t{'2': 1, '5': 2}",
 "7\t{'2': 1, '5': 1}",
 "8\t{'2': 1, '5': 1}",
 "9\t{'2': 1, '5': 1}",
 "10\t{'5': 1}",
 "11\t{'5': 2}"]

我可以将其拆分,并能够计算“ ”之前的节点,或者我可以编写一个函数来计算右侧的节点。这是一个权重 DAG。如果我用手数的话,我会看到有 11 个节点。但在进行区分和计数之前,我无法弄清楚如何将右侧的节点 1 带入节点。我的代码是

`import ast
def break_nodes(line):
    data_dict = ast.literal_eval(line)
    
    # Iterate through the dictionary items and print them
    for key, value in data_dict.items():
        print(f'key {key} val {value}')
        yield (int(key))
        
    
nodeIDs = dataRDD.map(lambda line: line.split('\t')) \
                    .flatMap(lambda x: break_nodes(x[1])) \
                    .distinct()`

这只是计算 右边的节点。我有左侧的代码,非常简单

`nodeIDs = dataRDD.flatMap(lambda line: line.split('\t')[0])
totalCount = nodeIDs.distinct().count()`

我可以对代码进行哪些修改来计算所有节点?尝试了很多方法我的大脑都快炸了

感谢帮助

python apache-spark pyspark mapreduce
1个回答
0
投票

让我们使用

flatMap
查找 rdd 中的所有节点,然后使用
distinct
获取唯一节点

import ast

def find_all(r):
    x, y = r.split('\t')
    return [x, *ast.literal_eval(y).keys()]

nodes = dataRDD.flatMap(find_all).distinct()

nodes.collect()
# ['4', '5', '10', '2', '1', '9', '3', '6', '7', '8', '11']

nodes.count()
# 11
© www.soinside.com 2019 - 2024. All rights reserved.