我有以下 PySpark 数据框:
身份证 | 价值 |
---|---|
1 | 值-1 |
1 | 值-2 |
1 | 值-3 |
2 | 值-1 |
2 | 值-2 |
我想把它转换成字典:
dict1 = {'1':['value-1','value-2','value-3'], '2':['value-1','value-2']}
我能够做到这一点(在下面写了一个答案),但我需要更简单、更有效的方法,而不需要将数据帧转换为 Pandas。
rdd.collectAsMap
:
from pyspark.sql.functions import collect_list
df_spark.groupBy("ID").agg(collect_list("Value")).rdd.collectAsMap()
groupby
和 to_dict
的方法:
# Convert to Pandas data frame
df_pandas = df_spark.toPandas()
df_pandas.groupby("ID")["Value"].apply(list).to_dict()
{'1': ['value-1', 'value-2', 'value-3'], '2': ['value-1', 'value-2']}
也许,你可以尝试:
import pyspark.sql.functions as F
records = df.groupBy('ID').agg(F.collect_list('Value').alias('List')).collect()
dict1 = {row['ID']: row['List'] for row in records}
print(dict1)
# Output
{1: ['value-1', 'value-2', 'value-3'], 2: ['value-1', 'value-2']}
我首先将 PySpark 数据帧转换为 pandas 数据帧,然后迭代所有单元格。这是 O(M*N) 的迭代时间,但成本高昂的部分是将 PySpark 数据帧转换为 pandas。
import pandas as pd
# Convert to Pandas data frame
df_pandas = df_spark.toPandas()
# Convert pandas data frame to dictionary
dict1= dict()
for i in range(0,len(df_pandas)):
key = df_pandas.iloc[i, 0]
if key not in dict1:
dict1.update({key:[]})
dict1[key].append(df_pandas.iloc[i, 1])
else:
dict1[key].append(df_pandas.iloc[i, 1])
这样的东西应该有效:
import pyspark.sql.functions as F
aggregation = df.groupby("ID").agg(F.collect_list("Value").alias("Value"))
dict(aggregation.rdd.map(lambda x: (x["ID"], x["Value"])).collect())