根据JSON字段将大JSON文件拆分为多个小JSON文件

问题描述 投票:0回答:1

我有一个 gzip JSON 文件,它具有复杂的 JSON 结构,并且是多行错误。 它有大约 500 万条记录,每条记录都是它自己的 JSON。 我在 JSON 中有一个名为 event.event_name 的字段,我想根据 event.event_name 拆分我的大文件并将拆分文件发布到 S3。 我想要每个 event.event_name 一个文件。 我尝试通过

读取大文件
textRDD = spark.sparkContext.textFile(file_path)

它在 4 秒内非常快地读取文件,然后我分开了

filteredRDD = textRDD.filter(lambda line: f'"event_name":"{event_name}"' in line) 

需要 150 秒,然后我使用

加载 JSON
df = spark.read.option("multiLine", "false").json(filteredRDD) 

并且需要 150 秒。 但是当我直接使用

读取时
df_raw = spark.read.option("multiLine", "false").json(file_paths)

需要 500 秒,然后我使用

进行过滤
df = df_raw.filter(df_raw["event.event_name"] == event_name)

需要30秒。 我在具有 2 个 DPU(每个 DPU 4 核 16 GB 内存)和 Spark 3.3 的 AWS GLUE 上运行此程序 有没有一种更快的方法来读取 event.event_name 并将其分组并更快地处理它们。 我什至在将 JSON 写回为 CSV 之前将其展平

def flatten_json(df, schema, prefix=""):
    flat_cols = []
    for field in schema.fields:
        field_name = prefix + field.name
        field_type = field.dataType
        if isinstance(field_type, StructType):
            nested_cols = flatten_json(df, field_type, field_name + ".")
            flat_cols.extend(nested_cols)
        else:
            flat_cols.append(col(prefix + field.name).alias(field_name.replace(".", "_")))
    return flat_cols

然后使用 flat_cols 我将从 df 中选择字段,例如

selected_df = df.select(*flattened_cols) 

展平列需要 180 秒,而选择每个 event.event_name 需要 100 秒

有没有一种简单的方法可以将 JSON 转换为 CSV 或类似平面文件的结构? 我的文件是 gzip,它有 500 万条记录。

The structure of JSON is 
{"app_name":"Foundation","export_hour":"13","export_doc_id":"65b387a65ed33a5","db_name":"Foundation","event":{"uid":"KrPVghgdR3=yjuW5byt2","event_type":"CAMPAIGN_EVENT","event_code":"NOTIFY","push_id":"dK-GiF-kbknW:APA90ISVz9ws3tLJPgC0W0uJs_-56Sa64Hxt2_pIioCVTyv7uk7dYhjTieAc3gGS2CgsBCdg4","event_name":"Received","user_attributes":{"subscription-status":"CANCELLED","guided_session_time":"4:30","completion_flag":true,"unsubscribe":false,"id":"KrPVghgdR3fuKPCyjuW5byt2","no_of_sessions":305,"Daily Mystic Quote":true,"renewal-type":"NA","last_known_country":"India","New Podcasts":true,"ex_currency":"INR","BSP_flag":true,"md_no_of_day_missed":40,"monthly_satsang_language":"Tam","self_practice_session_time":"17:30","subscription-type":"ANNUAL","gender":"Unknown","md_remind_before_mins":15,"sgex-currency":"INR","cyw_flag":false,"sgex-cancellation-reason":"Renewed Annual Subscription","Journey status":"completed","presence_reminder":"Yes","mandala_start_dt":"2021-11-23","sgex-trial-type":"INTRODUCTORY","md_practice_time_morning":"04:30 AM","email":"[email protected]","md_no_of_day_completed":40,"last_name":"Amb","onboard_reason_1":"Wisdom","onboard_reason_2":"Meditation","sso_country":"IN","samyama_flag":true,"user-status":"Active","cancelled-date":1705760434000,"ex Subscription Status":"ACTIVE","monthly_satsang_flag":"Yes","user_time_zone_offset_(mins)":330,"trial-days":7,"moe_rsp_android":200,"onboard_reason_3":"Yoga","hard_bounce":false,"ssbd_my_steps":"Chew Your Food Well,Drink Water with Reverence,Don't Talk While Eating,Apply Sacred Ash,Breathe Right,Consume Honey in Warm Water","spam":false,"lom_flag":false,"last_known_city":"Madai","Email":"[email protected]","last_seen":1706594768000,"sso_lastname":"Amb","md_practice_time_evening":"05:30 PM","wake_up_alarm_page_alarm_enabled":"Yes","DLCD_Flag":"Yes","Days Practiced":26,"New Videos":true,"onboard_journey":"Volunteer","wake_up_alarm_page_days_selected":"Monday,Tuesday,Wednesday,Thursday,Friday,Saturday,Sunday","My Chants Playlist":14,"kriya_reminder":"Yes","ie_flag":true,"Monthly_Meditator_Satsang":"Yes","no_of_conversions":0,"user_creation_source":"sdk","Device":"Android","pooja_reminder":"Yes","Language Preference":"ta","sso_ieo_regs_flag":false,"guestUserInstallationId":"17b3f4583-c9d741809549","install_status":true,"last_known_pincode":693020,"shoonya_flag":true,"Meditator":true,"name":"Kam","trial-status":"Expired","onboard_goal_1":"Energy","onboard_goal_3":"Space","onboard_goal_2":"conscious","subscription-date":1674312082000,"first_seen":1699538478606,"presence_reminder_time":"15min","last_known_state":"Nadu","meditation_health_reminder":"Yes","rd_flag":true,"mandala_end_dt":"1837814400000.0","sso_name":"Kam","AI-Clicked":"Yes","class_status":"","sgex-amount":900,"moengage_user_id":"654dc002bee2163","Minutes Practiced":536,"IECSO_flag":false,"sg-ex Subscription Type":"SE-Yly","onboard_age":"Between 35 and 44","goy_online_flag":false,"first_name":"Kam","Streak Day":7,"ltv":0.0,"Mobile Token":"fcm:dK-GiFKUQGG5UwQ1-kbknW:APA9z9ws3tLJPgC0W0uJs_-56Sa64Hxt2_pIioeb2b9idYhjTieAc3gGS2CgsBCdg4","wake_up_alarm_page_time_selected":"3,0","FMF_flag":true,"sso_created":"1582148415"},"event_source":"MOENGAGE","event_attributes":{"campaign_name":"30Jan-PN4","moe_campaign_tags":["engagement"],"sent_epoch_time":"1706621844","campaign_type":"GENERAL","moe_campaign_channel":"Push","moe_delivery_type":"One Time","campaign_id":"65b74b4676fadf","campaign_channel":"PUSH"},"event_uuid":"ef9051db-42dd-851ba44b","event_time":1706621846,"device_attributes":{"moengage_device_id":"66a75e4a-d104e3935f56"}},"export_day":"2024-01-30"}

结构不是静态的,它是动态的,并且每条记录都在不断变化。

您能帮我根据 event.event_names 拆分 JSON 并将 JSON 展平并将其保存为 S3 中的 CSV 吗?我也没有 event.event_names 的唯一列表。 我想要处理文件中的 70 个事件,我的代码需要大约 5 个小时来处理它们。我想让它更快。

json apache-spark pyspark
1个回答
0
投票

一种方法是这样的。

您可以为每个

event_name
值获取多个文件,这些文件可以在稍后作为后处理步骤合并。

import pyspark.sql.functions as F
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Split Json Files").getOrCreate()
spark.conf.set('spark.sql.caseSensitive', True)
inputFile = "../data/json5mfile.json"
outputDir = "../data_output/partitioned_output"

df = spark.read.json(inputFile)
df = df.withColumn("event_name_partition", F.col("event.event_name"))
df.show(n=100, truncate=False)

#------------
#Further transformations here that you want to be done.
#-----------

## As there will be multiple executors and the data will be distributed on them, they will each try to write their own file.
## So you can coalesce the file as a post-processing step later on.
df.write.partitionBy("event_name_partition").format("json").mode("overwrite").option("maxRecordsPerFile", 100000).save(outputDir)

输出:在磁盘上写出的文件

data_output/
└── partitioned_output
    ├── event_name_partition=Received1
    │   └── part-00000-c3a58e7c-7a94-46f4-b2ac-aa21f11e6ab7.c000.json
    ├── event_name_partition=Received2
    │   └── part-00000-c3a58e7c-7a94-46f4-b2ac-aa21f11e6ab7.c000.json
    ├── event_name_partition=Received3
    │   └── part-00000-c3a58e7c-7a94-46f4-b2ac-aa21f11e6ab7.c000.json
    └── _SUCCESS
© www.soinside.com 2019 - 2024. All rights reserved.