Spark RDD中的JSON聚合文件

问题描述 投票:-3回答:1

我有一系列看起来与此相似的文件:

[
 {
  'id':1,
  'transactions': [
   {
    'date': '2019-01-01',
    'amount': 50.50
   },
   {
    'date': '2019-01-02',
    'amount': 10.20
   },
  ]
 },
 {
  'id':2,
  'transactions': [
   {
    'date': '2019-01-01',
    'amount': 10.20
   },
   {
    'date': '2019-01-02',
    'amount': 0.50
   },
  ]
 }
]

我使用以下代码将这些文件加载​​到Spark中

users= spark.read.option("multiline", "true").json(file_location)

结果是具有两列idtransactions的SparkData框架,其中transactions是StructType。

我希望能够“映射”每个用户的transactions以进行汇总。

当前我正在使用rdd和一个看起来像这样的函数:

users.rdd.map(lambda a: summarize_transactions(a.transactions))

summary函数可以有两种类型:a)将对象列表转换为熊猫数据框以对其进行汇总。b)遍历对象列表以对其进行汇总。

但是我发现a.transactionspyspark.sql.types.Row的列表。而不是实际的字典。

1)这是实现我的目标的最佳方法吗?2)如何将“ Spark Rows”列表转换为“ Dictionary”的原始列表?

json apache-spark pyspark rdd
1个回答
0
投票

我找到了解决自己的问题的方法:

STEP 1:将数据作为文本文件加载:step1= sc.textFile(file_location)

步骤2:以JSON和FLATMAP格式读取

import json
step2 = step1.map(lambda a: json.loads(a)).flatMap(lambda a: a)

第3步:减少关键地图

setp3 = (
 step2
 .map(lambda line: [line['id'], line['transactions']])
 .reduceByKey(lambda a, b: a + b)
 .mapValues(lambda a: summarize_transactions(a))
)
© www.soinside.com 2019 - 2024. All rights reserved.