我有一系列看起来与此相似的文件:
[
{
'id':1,
'transactions': [
{
'date': '2019-01-01',
'amount': 50.50
},
{
'date': '2019-01-02',
'amount': 10.20
},
]
},
{
'id':2,
'transactions': [
{
'date': '2019-01-01',
'amount': 10.20
},
{
'date': '2019-01-02',
'amount': 0.50
},
]
}
]
我使用以下代码将这些文件加载到Spark中
users= spark.read.option("multiline", "true").json(file_location)
结果是具有两列id
和transactions
的SparkData框架,其中transactions
是StructType。
我希望能够“映射”每个用户的transactions
以进行汇总。
当前我正在使用rdd和一个看起来像这样的函数:
users.rdd.map(lambda a: summarize_transactions(a.transactions))
summary函数可以有两种类型:a)将对象列表转换为熊猫数据框以对其进行汇总。b)遍历对象列表以对其进行汇总。
但是我发现a.transactions
是pyspark.sql.types.Row
的列表。而不是实际的字典。
1)这是实现我的目标的最佳方法吗?2)如何将“ Spark Rows”列表转换为“ Dictionary”的原始列表?
我找到了解决自己的问题的方法:
STEP 1:将数据作为文本文件加载:step1= sc.textFile(file_location)
步骤2:以JSON和FLATMAP格式读取
import json
step2 = step1.map(lambda a: json.loads(a)).flatMap(lambda a: a)
第3步:减少关键地图
setp3 = (
step2
.map(lambda line: [line['id'], line['transactions']])
.reduceByKey(lambda a, b: a + b)
.mapValues(lambda a: summarize_transactions(a))
)