对数据框中的嵌套 json 列应用转换

问题描述 投票:0回答:1

我有一个场景,S3 存储桶中会有多个 json 文件,我的 Pyspark 脚本会将其加载到数据帧中。

每个 json 将具有以下结构 -

[
    {
        "RECORDS_HEADER": {
            "RECORD_TYPE": "HEADER",
            "DATA": {
                "COUNT_OF_OBJECTS_RECORD_TYPE_001": 1,
                "COUNT_OF_OBJECTS_RECORD_TYPE_002": 3,
                "COUNT_OF_OBJECTS_RECORD_TYPE_003": 6,
                "COUNT_OF_OBJECTS_RECORD_TYPE_004": 3,
                "COUNT_OF_OBJECTS_RECORD_TYPE_005": 1,
                "COUNT_OF_OBJECTS_RECORD_TYPE_006": 1,
                "COUNT_OF_OBJECTS_RECORD_TYPE_007": 0
            }
        },
        "RECORDS_001": {
            "RECORD_TYPE": "001",
            "DATA": {
                "MEMBER_ID": "U3652928373",
                "INDIV_ID": "3003790",
                "MBR_PREF_LANG": "English",
                "MBR_PREF_LARGE_PRINT": "N",
                "BATCH_BEGIN_DATE": "01/01/2023",
                "BATCH_END_DATE": "11/30/2023",
                "BATCH_RUN_DATE": "12/18/2023",
                "BATCH_ID": 1000000078,
                "BATCH_STATUS": "COMPLETE",
                "COLATERAL_TYPE": "EOB"
            }
        },
        "RECORDS_002": {
            "RECORD_TYPE": "002",
            "DATA": [
                {
                    "CLAIM_NUMBER": "232423113200",
                    "RECEIVED_DATE": "08/30/2023",
                    "PROCESSED_DATE": "09/16/2023",
                    "PAID_DATE": "09/15/2023",
                    "CLAIM_DOS_BEGIN_DT": "08/29/2023",
                    "CLAIM_DOS_END_DT": "08/29/2023",
                    "CONTRACT_ID": "H0354",
                    "PBP": "027",
                    "SEGMENT": "000",
                    "GROUP_ID": "",
                    "CLASS_ID": "",
                    "CLASS_PLAN_ID": "",
                    "RENDERING_PROVIDER_NAME": "ESCHLER, DAVID J",
                    "CLAIM_TYP": "H",
                    "PAR_IND": "Y",
                    "CLAIM_QMB_ELIG": "N"
                },
                {
                    "CLAIM_NUMBER": "23P001125500",
                    "RECEIVED_DATE": "05/30/2023",
                    "PROCESSED_DATE": "06/16/2023",
                    "PAID_DATE": "06/15/2023",
                    "CLAIM_DOS_BEGIN_DT": "05/25/2023",
                    "CLAIM_DOS_END_DT": "05/25/2023",
                    "CONTRACT_ID": "H0354",
                    "PBP": "001",
                    "SEGMENT": "000",
                    "GROUP_ID": "",
                    "CLASS_ID": "",
                    "CLASS_PLAN_ID": "",
                    "RENDERING_PROVIDER_NAME": "PRAC HH, MHK HOME G.",
                    "CLAIM_TYP": "H",
                    "PAR_IND": "Y",
                    "CLAIM_QMB_ELIG": "N"
                },
                {
                    "CLAIM_NUMBER": "23E002114300",
                    "RECEIVED_DATE": "01/30/2024",
                    "PROCESSED_DATE": "01/16/2024",
                    "PAID_DATE": "01/15/2024",
                    "CLAIM_DOS_BEGIN_DT": "01/12/2024",
                    "CLAIM_DOS_END_DT": "01/12/2024",
                    "CONTRACT_ID": "H0354",
                    "PBP": "028",
                    "SEGMENT": "000",
                    "GROUP_ID": "",
                    "CLASS_ID": "",
                    "CLASS_PLAN_ID": "",
                    "RENDERING_PROVIDER_NAME": "ZEN, MAX",
                    "CLAIM_TYP": "H",
                    "PAR_IND": "Y",
                    "CLAIM_QMB_ELIG": "N"
                }
            ]
        },
        "RECORDS_003": {
            "RECORD_TYPE": "003",
            "DATA": [
                {
                    "CLAIM_NUMBER": "23E002113200",
                    "CLAIM_LINE_NUMBER": "2",
                    "CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
                    "CLAIMLN_DOS_END_DT": "08/29/2023",
                    "BILLING_CODE": "96401",
                    "BILLING_CODE_DESC": "Complex medication injected",
                    "LINE_NOTES_CODE": "B",
                    "TOTAL_CHARGE": "475.00",
                    "ALLOWED": "0.00",
                    "PAID_AMT": "0.00",
                    "PATIENT_RESPONSIBILITY": "0.00",
                    "DEDUCTIBLE": "0.00",
                    "COINSURANCE": "0.00",
                    "COINSURANCE_PCT": "0.00",
                    "COPAY": "0.00",
                    "CLAIM_STATUS_DENIED": "Y",
                    "OVERAGE_AMOUNT": "0.00"
                },
                {
                    "CLAIM_NUMBER": "23E002113200",
                    "CLAIM_LINE_NUMBER": "4",
                    "CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
                    "CLAIMLN_DOS_END_DT": "08/29/2023",
                    "BILLING_CODE": "96401",
                    "BILLING_CODE_DESC": "Complex medication injected",
                    "LINE_NOTES_CODE": "B",
                    "TOTAL_CHARGE": "475.00",
                    "ALLOWED": "0.00",
                    "PAID_AMT": "0.00",
                    "PATIENT_RESPONSIBILITY": "0.00",
                    "DEDUCTIBLE": "0.00",
                    "COINSURANCE": "0.00",
                    "COINSURANCE_PCT": "0.00",
                    "COPAY": "0.00",
                    "CLAIM_STATUS_DENIED": "Y",
                    "OVERAGE_AMOUNT": "0.00"
                }
            ]
        },
        "RECORDS_004": {
            "RECORD_TYPE": "004",
            "DATA": [
                {
                    "CLAIM_NUMBER": "123443536",
                    "CLAIM_TOTAL_CHARGE": "1800.00",
                    "CLAIM_ALLOWED": "900.00",
                    "CLAIM_PAID_AMT": "700.00",
                    "CLAIM_PATIENT_RESPONSIBILITY": "200.00",
                    "CLAIM_COPAY": "200.00",
                    "CLAIM_COINSURANCE": "0.00",
                    "CLAIM_DEDUCTIBLE": "0.00",
                    "CLAIM_OVERAGE_AMOUNT": "0.00"
                },
                {
                    "CLAIM_NUMBER": "123443536",
                    "CLAIM_TOTAL_CHARGE": "1000.00",
                    "CLAIM_ALLOWED": "335.77",
                    "CLAIM_PAID_AMT": "268.62",
                    "CLAIM_PATIENT_RESPONSIBILITY": "67.15",
                    "CLAIM_COPAY": "0.00",
                    "CLAIM_COINSURANCE": "67.15",
                    "CLAIM_DEDUCTIBLE": "0.00",
                    "CLAIM_OVERAGE_AMOUNT": "0.00"
                },
                {
                    "CLAIM_NUMBER": "123443536",
                    "CLAIM_TOTAL_CHARGE": "1900.00",
                    "CLAIM_ALLOWED": "0.00",
                    "CLAIM_PAID_AMT": "0.00",
                    "CLAIM_PATIENT_RESPONSIBILITY": "0.00",
                    "CLAIM_COPAY": "0.00",
                    "CLAIM_COINSURANCE": "0.00",
                    "CLAIM_DEDUCTIBLE": "0.00",
                    "CLAIM_OVERAGE_AMOUNT": "0.00"
                }
            ]
        },
        "RECORDS_005": {
            "RECORD_TYPE": "005",
            "DATA": {
                "PROCESSED_PERIOD_AMOUNT_CHARGED": "4700.00",
                "PROCESSED_PERIOD_ALLOWED_AMT": "1235.77",
                "PROCESSED_PERIOD_PAID_AMT": "968.62",
                "PROCESSED_PERIOD_PATIENT_RESPONSIBILITY": "267.15"
            }
        },
        "RECORDS_006": {
            "RECORD_TYPE": "006",
            "DATA": {
                "YEAR_BEGIN_DT": "01/01/2024",
                "YEAR_END_DT": "12/31/2024",
                "START_DATE": "01/01/2024",
                "END_DATE": "11/30/2024"                
            }
        }
    }
]

当所有 json 文件都加载到单个数据帧中时,数据帧看起来像下面的格式 -

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|         RECORDS_001|         RECORDS_002|         RECORDS_003|         RECORDS_004|         RECORDS_005|         RECORDS_006|      RECORDS_HEADER|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{{01/01/2023, 11/...|{[{08/29/2023, 08...|{[{0.00, 96401, C...|{[{900.00, 0.00, ...|{{1235.77, 4700.0...|{{B, 11/30/2024, ...|{{1, 3, 6, 3, 1, ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

现在我需要在嵌套 json 中进行转换,就像 RECORDS_002 中的示例之一一样,我需要更改同一行中所有记录以及所有行的 PROCESSED_DATE 字段的日期格式。同样,不同的嵌套 json 字段还需要其他转换。

请让我知道如何对 PySpark 数据框中的嵌套 json 字段执行转换,而不会使过程太慢。

如果需要,我愿意回答任何进一步的问题。

谢谢!!

amazon-web-services pyspark aws-glue
1个回答
0
投票

看看这个:

import pyspark.sql.functions as f

df = (
  spark.read.option('multiline', True).json('./test_json.json')
  .withColumn('RECORDS002_PROCESSED_DATE_transformed', f.expr('transform(RECORDS_002.DATA.PROCESSED_DATE, x -> date_format(to_timestamp(x, "MM/dd/yyyy"), "yyyy-MM-dd"))'))
  .select('RECORDS_002.DATA.PROCESSED_DATE', 'RECORDS002_PROCESSED_DATE_transformed')
)

输出为:

+------------------------------------+-------------------------------------+
|PROCESSED_DATE                      |RECORDS002_PROCESSED_DATE_transformed|
+------------------------------------+-------------------------------------+
|[09/16/2023, 06/16/2023, 01/16/2024]|[2023-09-16, 2023-06-16, 2024-01-16] |
+------------------------------------+-------------------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.