PySpark 应用多个转换需要太长时间

问题描述 投票:0回答:1

我有一个场景,S3 存储桶中会有多个 json 文件,我的 Pyspark 脚本会将其加载到数据帧中。

每个 json 将具有以下结构 -

[
    {
        "RECORDS_HEADER": {
            "RECORD_TYPE": "HEADER",
            "DATA": {
                "COUNT_OF_OBJECTS_RECORD_TYPE_001": 1,
                "COUNT_OF_OBJECTS_RECORD_TYPE_002": 3,
                "COUNT_OF_OBJECTS_RECORD_TYPE_003": 6,
                "COUNT_OF_OBJECTS_RECORD_TYPE_004": 3,
                "COUNT_OF_OBJECTS_RECORD_TYPE_005": 1,
                "COUNT_OF_OBJECTS_RECORD_TYPE_006": 1,
                "COUNT_OF_OBJECTS_RECORD_TYPE_007": 0
            }
        },
        "RECORDS_001": {
            "RECORD_TYPE": "001",
            "DATA": {
                "MEMBER_ID": "U3652928373",
                "INDIV_ID": "3003790",
                "MBR_PREF_LANG": "English",
                "MBR_PREF_LARGE_PRINT": "N",
                "BATCH_BEGIN_DATE": "01/01/2023",
                "BATCH_END_DATE": "11/30/2023",
                "BATCH_RUN_DATE": "12/18/2023",
                "BATCH_ID": 1000000078,
                "BATCH_STATUS": "COMPLETE",
                "COLATERAL_TYPE": "EOB"
            }
        },
        "RECORDS_002": {
            "RECORD_TYPE": "002",
            "DATA": [
                {
                    "CLAIM_NUMBER": "232423113200",
                    "RECEIVED_DATE": "08/30/2023",
                    "PROCESSED_DATE": "09/16/2023",
                    "PAID_DATE": "09/15/2023",
                    "CLAIM_DOS_BEGIN_DT": "08/29/2023",
                    "CLAIM_DOS_END_DT": "08/29/2023",
                    "CONTRACT_ID": "H0354",
                    "PBP": "027",
                    "SEGMENT": "000",
                    "GROUP_ID": "",
                    "CLASS_ID": "",
                    "CLASS_PLAN_ID": "",
                    "RENDERING_PROVIDER_NAME": "ESCHLER, DAVID J",
                    "CLAIM_TYP": "H",
                    "PAR_IND": "Y",
                    "CLAIM_QMB_ELIG": "N"
                },
                {
                    "CLAIM_NUMBER": "23P001125500",
                    "RECEIVED_DATE": "05/30/2023",
                    "PROCESSED_DATE": "06/16/2023",
                    "PAID_DATE": "06/15/2023",
                    "CLAIM_DOS_BEGIN_DT": "05/25/2023",
                    "CLAIM_DOS_END_DT": "05/25/2023",
                    "CONTRACT_ID": "H0354",
                    "PBP": "001",
                    "SEGMENT": "000",
                    "GROUP_ID": "",
                    "CLASS_ID": "",
                    "CLASS_PLAN_ID": "",
                    "RENDERING_PROVIDER_NAME": "PRAC HH, MHK HOME G.",
                    "CLAIM_TYP": "H",
                    "PAR_IND": "Y",
                    "CLAIM_QMB_ELIG": "N"
                },
                {
                    "CLAIM_NUMBER": "23E002114300",
                    "RECEIVED_DATE": "01/30/2024",
                    "PROCESSED_DATE": "01/16/2024",
                    "PAID_DATE": "01/15/2024",
                    "CLAIM_DOS_BEGIN_DT": "01/12/2024",
                    "CLAIM_DOS_END_DT": "01/12/2024",
                    "CONTRACT_ID": "H0354",
                    "PBP": "028",
                    "SEGMENT": "000",
                    "GROUP_ID": "",
                    "CLASS_ID": "",
                    "CLASS_PLAN_ID": "",
                    "RENDERING_PROVIDER_NAME": "ZEN, MAX",
                    "CLAIM_TYP": "H",
                    "PAR_IND": "Y",
                    "CLAIM_QMB_ELIG": "N"
                }
            ]
        },
        "RECORDS_003": {
            "RECORD_TYPE": "003",
            "DATA": [
                {
                    "CLAIM_NUMBER": "23E002113200",
                    "CLAIM_LINE_NUMBER": "2",
                    "CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
                    "CLAIMLN_DOS_END_DT": "08/29/2023",
                    "BILLING_CODE": "96401",
                    "BILLING_CODE_DESC": "Complex medication injected",
                    "LINE_NOTES_CODE": "B",
                    "TOTAL_CHARGE": "475.00",
                    "ALLOWED": "0.00",
                    "PAID_AMT": "0.00",
                    "PATIENT_RESPONSIBILITY": "0.00",
                    "DEDUCTIBLE": "0.00",
                    "COINSURANCE": "0.00",
                    "COINSURANCE_PCT": "0.00",
                    "COPAY": "0.00",
                    "CLAIM_STATUS_DENIED": "Y",
                    "OVERAGE_AMOUNT": "0.00"
                },
                {
                    "CLAIM_NUMBER": "23E002113200",
                    "CLAIM_LINE_NUMBER": "4",
                    "CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
                    "CLAIMLN_DOS_END_DT": "08/29/2023",
                    "BILLING_CODE": "96401",
                    "BILLING_CODE_DESC": "Complex medication injected",
                    "LINE_NOTES_CODE": "B",
                    "TOTAL_CHARGE": "475.00",
                    "ALLOWED": "0.00",
                    "PAID_AMT": "0.00",
                    "PATIENT_RESPONSIBILITY": "0.00",
                    "DEDUCTIBLE": "0.00",
                    "COINSURANCE": "0.00",
                    "COINSURANCE_PCT": "0.00",
                    "COPAY": "0.00",
                    "CLAIM_STATUS_DENIED": "Y",
                    "OVERAGE_AMOUNT": "0.00"
                }
            ]
        },
        "RECORDS_004": {
            "RECORD_TYPE": "004",
            "DATA": [
                {
                    "CLAIM_NUMBER": "123443536",
                    "CLAIM_TOTAL_CHARGE": "1800.00",
                    "CLAIM_ALLOWED": "900.00",
                    "CLAIM_PAID_AMT": "700.00",
                    "CLAIM_PATIENT_RESPONSIBILITY": "200.00",
                    "CLAIM_COPAY": "200.00",
                    "CLAIM_COINSURANCE": "0.00",
                    "CLAIM_DEDUCTIBLE": "0.00",
                    "CLAIM_OVERAGE_AMOUNT": "0.00"
                },
                {
                    "CLAIM_NUMBER": "123443536",
                    "CLAIM_TOTAL_CHARGE": "1000.00",
                    "CLAIM_ALLOWED": "335.77",
                    "CLAIM_PAID_AMT": "268.62",
                    "CLAIM_PATIENT_RESPONSIBILITY": "67.15",
                    "CLAIM_COPAY": "0.00",
                    "CLAIM_COINSURANCE": "67.15",
                    "CLAIM_DEDUCTIBLE": "0.00",
                    "CLAIM_OVERAGE_AMOUNT": "0.00"
                },
                {
                    "CLAIM_NUMBER": "123443536",
                    "CLAIM_TOTAL_CHARGE": "1900.00",
                    "CLAIM_ALLOWED": "0.00",
                    "CLAIM_PAID_AMT": "0.00",
                    "CLAIM_PATIENT_RESPONSIBILITY": "0.00",
                    "CLAIM_COPAY": "0.00",
                    "CLAIM_COINSURANCE": "0.00",
                    "CLAIM_DEDUCTIBLE": "0.00",
                    "CLAIM_OVERAGE_AMOUNT": "0.00"
                }
            ]
        },
        "RECORDS_005": {
            "RECORD_TYPE": "005",
            "DATA": {
                "PROCESSED_PERIOD_AMOUNT_CHARGED": "4700.00",
                "PROCESSED_PERIOD_ALLOWED_AMT": "1235.77",
                "PROCESSED_PERIOD_PAID_AMT": "968.62",
                "PROCESSED_PERIOD_PATIENT_RESPONSIBILITY": "267.15"
            }
        },
        "RECORDS_006": {
            "RECORD_TYPE": "006",
            "DATA": {
                "YEAR_BEGIN_DT": "01/01/2024",
                "YEAR_END_DT": "12/31/2024",
                "START_DATE": "01/01/2024",
                "END_DATE": "11/30/2024"                
            }
        }
    }
]

当所有 json 文件都加载到单个数据帧中时,数据帧看起来像下面的格式 -

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|         RECORDS_001|         RECORDS_002|         RECORDS_003|         RECORDS_004|         RECORDS_005|         RECORDS_006|      RECORDS_HEADER|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{{01/01/2023, 11/...|{[{08/29/2023, 08...|{[{0.00, 96401, C...|{[{900.00, 0.00, ...|{{1235.77, 4700.0...|{{B, 11/30/2024, ...|{{1, 3, 6, 3, 1, ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

现在我需要对嵌套 json 列下的每一行进行多次验证。

所以我创建了下面的表达式列表 -

["CASE WHEN RECORDS_001.DATA.MEMBER_ID = '' OR RECORDS_001.DATA.MEMBER_ID IS NULL THEN 'MEMBER_ID is missing' ELSE NULL END",
                     "CASE WHEN RECORDS_001.DATA.MBR_PREF_LANG = '' OR RECORDS_001.DATA.MBR_PREF_LANG IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'MBR_PREF_LANG is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_001.DATA.MBR_PREF_LARGE_PRINT = '' OR RECORDS_001.DATA.MBR_PREF_LARGE_PRINT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'MBR_PREF_LARGE_PRINT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_005.DATA.PROCESSED_PERIOD_AMOUNT_CHARGED = '' OR RECORDS_005.DATA.PROCESSED_PERIOD_AMOUNT_CHARGED IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PROCESSED_PERIOD_AMOUNT_CHARGED is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_005.DATA.PROCESSED_PERIOD_ALLOWED_AMT = '' OR RECORDS_005.DATA.PROCESSED_PERIOD_ALLOWED_AMT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PROCESSED_PERIOD_ALLOWED_AMT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_005.DATA.PROCESSED_PERIOD_PAID_AMT = '' OR RECORDS_005.DATA.PROCESSED_PERIOD_PAID_AMT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PROCESSED_PERIOD_PAID_AMT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_001.DATA.BATCH_BEGIN_DATE = '' OR RECORDS_001.DATA.BATCH_BEGIN_DATE IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'BATCH_BEGIN_DATE is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_001.DATA.BATCH_END_DATE = '' OR RECORDS_001.DATA.BATCH_END_DATE IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'BATCH_END_DATE is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_006.DATA.PLAN_DEDUCTIBLE = '' OR RECORDS_006.DATA.PLAN_DEDUCTIBLE IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PLAN_DEDUCTIBLE is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_006.DATA.SERVICE_CTG_IND = '' OR RECORDS_006.DATA.SERVICE_CTG_IND IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'SERVICE_CTG_IND is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_006.DATA.YT_ALLOWED_AMOUNT = '' OR RECORDS_006.DATA.YT_ALLOWED_AMOUNT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'YT_ALLOWED_AMOUNT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_006.DATA.YT_AMOUNT_CHARGED = '' OR RECORDS_006.DATA.YT_AMOUNT_CHARGED IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'YT_AMOUNT_CHARGED is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_006.DATA.YT_PAID_AMT = '' OR RECORDS_006.DATA.YT_PAID_AMT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'YT_PAID_AMT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_006.DATA.YT_MEMLIABILITY_AMT = '' OR RECORDS_006.DATA.YT_MEMLIABILITY_AMT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'YT_MEMLIABILITY_AMT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN RECORDS_005.DATA.PROCESSED_PERIOD_PATIENT_RESPONSIBILITY = '' OR RECORDS_005.DATA.PROCESSED_PERIOD_PATIENT_RESPONSIBILITY IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PROCESSED_PERIOD_PATIENT_RESPONSIBILITY is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_003.DATA.TOTAL_CHARGE, x -> trim(x)), '') OR exists(RECORDS_003.DATA.TOTAL_CHARGE, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'TOTAL_CHARGE is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_003.DATA.PATIENT_RESPONSIBILITY, x -> trim(x)), '') OR exists(RECORDS_003.DATA.PATIENT_RESPONSIBILITY, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'PATIENT_RESPONSIBILITY is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_004.DATA.CLAIM_TOTAL_CHARGE, x -> trim(x)), '') OR exists(RECORDS_004.DATA.CLAIM_TOTAL_CHARGE, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_TOTAL_CHARGE is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_004.DATA.CLAIM_ALLOWED, x -> trim(x)), '') OR exists(RECORDS_004.DATA.CLAIM_ALLOWED, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_ALLOWED is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_004.DATA.CLAIM_PAID_AMT, x -> trim(x)), '') OR exists(RECORDS_004.DATA.CLAIM_PAID_AMT, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_PAID_AMT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_004.DATA.CLAIM_PATIENT_RESPONSIBILITY, x -> trim(x)), '') OR exists(RECORDS_004.DATA.CLAIM_PATIENT_RESPONSIBILITY, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_PATIENT_RESPONSIBILITY is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_002.DATA.CLAIM_DOS_BEGIN_DT, x -> trim(x)), '') OR exists(RECORDS_002.DATA.CLAIM_DOS_BEGIN_DT, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_DOS_BEGIN_DT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_002.DATA.CLAIM_DOS_END_DT, x -> trim(x)), '') OR exists(RECORDS_002.DATA.CLAIM_DOS_END_DT, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_DOS_END_DT is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_002.DATA.CLAIM_NUMBER, x -> trim(x)), '') OR exists(RECORDS_002.DATA.CLAIM_NUMBER, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_NUMBER is missing') ELSE FAILED_VALIDATIONS END",
                     "CASE WHEN array_contains(transform(RECORDS_007.DATA.PREV_PLAN_OOP_COMBINED_AMT, x -> trim(x)), '') OR exists(RECORDS_007.DATA.PREV_PLAN_OOP_COMBINED_AMT, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'PREV_PLAN_OOP_COMBINED_AMT is missing') ELSE FAILED_VALIDATIONS END"
                     ]

为了应用上述验证,我创建了以下方法 -

def member_records_validations(self, input_df):
        input_validated_df = input_df
        for validations in FIELD_VALIDATIONS:
            input_validated_df = input_validated_df.withColumn('FAILED_VALIDATIONS',
                                                               expr(validations))

        input_validated_df = input_validated_df.withColumn('VALIDATION_STATUS', when(
            col('FAILED_VALIDATIONS').isNull(), 'PASSED').otherwise('FAILED'))

        return input_validated_df

因此,在 baove 方法中,我创建新列 FAILED_VALIDATIONS 并继续添加为 null 或空的字段名称。

但是即使只针对 4 个输入 json 文件,上述方法运行时间也太长。 有人可以告诉我如何优化这个吗? 如果需要的话,我愿意回答任何进一步的问题。

谢谢!!

python apache-spark pyspark
1个回答
0
投票

不要使用 withColumn 或 withColumnRenamed 进行此类转换,将所有列添加为单个选择的一部分(临时缓存等既不需要,也不具有高性能)。

查询计划看起来像是经过优化的,但事实并非如此。 基于 Scala 的方法证据

此外,作为警告,构建像这样的大型表达式树可能导致构建操作缓慢(甚至生成的代码大小)。这是采用各种Quality引擎的方法的原因之一。

© www.soinside.com 2019 - 2024. All rights reserved.