如何使用reduceByKey(pyspark)嵌套结构?

问题描述 投票:0回答:1

我正在对要基于3个值进行分区并写回到S3的数据集使用spark(pyspark)。数据集如下所示-

customerId,productId,createDate

我想通过customerId然后productId然后createDate对数据进行分区。因此,当我将分区数据写入s3时,它应具有以下结构-

customerId=1
  productId='A1'
    createDate=2019-10
    createDate=2019-11
    createDate=2019-12
  productId='A2'
    createDate=2019-10
    createDate=2019-11
    createDate=2019-12

下面是我用来创建分区的代码。

rdd = sc.textFile("data.json")  #sc is spark context
r1.map(lambda r: (r["customerId"], r["productId"],r["createDate"])).distinct().map(lambda r: (r[0], ([r[1]],[r[2]]))).reduceByKey(lambda a, b: (a[0] + b[0],a[1] + b[1])).collect()

[('1',([A1,A2],['2019-12','2019-11','2019-10','2019-12','2019-11','2019-10']))]]

此代码的确为我提供了一个扁平的结构,而不是我提到的嵌套结构。是否有可能改变我的描述方式。任何指针都是高度赞赏的。

python pyspark rdd reduce
1个回答
0
投票

首先将您的JSON文件读取到数据框。

import json
a=[json.dumps("/data.json")]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)

然后使用groupbycollectlist获得所需的格式。

import pyspark.sql.functions as func
df.groupby('customerId','productId').agg(func.collectList('createDate')).collect()
© www.soinside.com 2019 - 2024. All rights reserved.