我有一个场景,S3 存储桶中会有多个 json 文件,我的 Pyspark 脚本会将其加载到数据帧中。
每个 json 将具有以下结构 -
[
{
"RECORDS_HEADER": {
"RECORD_TYPE": "HEADER",
"DATA": {
"COUNT_OF_OBJECTS_RECORD_TYPE_001": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_002": 3,
"COUNT_OF_OBJECTS_RECORD_TYPE_003": 6,
"COUNT_OF_OBJECTS_RECORD_TYPE_004": 3,
"COUNT_OF_OBJECTS_RECORD_TYPE_005": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_006": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_007": 0
}
},
"RECORDS_001": {
"RECORD_TYPE": "001",
"DATA": {
"MEMBER_ID": "U3652928373",
"INDIV_ID": "3003790",
"MBR_PREF_LANG": "English",
"MBR_PREF_LARGE_PRINT": "N",
"BATCH_BEGIN_DATE": "01/01/2023",
"BATCH_END_DATE": "11/30/2023",
"BATCH_RUN_DATE": "12/18/2023",
"BATCH_ID": 1000000078,
"BATCH_STATUS": "COMPLETE",
"COLATERAL_TYPE": "EOB"
}
},
"RECORDS_002": {
"RECORD_TYPE": "002",
"DATA": [
{
"CLAIM_NUMBER": "232423113200",
"RECEIVED_DATE": "08/30/2023",
"PROCESSED_DATE": "09/16/2023",
"PAID_DATE": "09/15/2023",
"CLAIM_DOS_BEGIN_DT": "08/29/2023",
"CLAIM_DOS_END_DT": "08/29/2023",
"CONTRACT_ID": "H0354",
"PBP": "027",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "ESCHLER, DAVID J",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
},
{
"CLAIM_NUMBER": "23P001125500",
"RECEIVED_DATE": "05/30/2023",
"PROCESSED_DATE": "06/16/2023",
"PAID_DATE": "06/15/2023",
"CLAIM_DOS_BEGIN_DT": "05/25/2023",
"CLAIM_DOS_END_DT": "05/25/2023",
"CONTRACT_ID": "H0354",
"PBP": "001",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "PRAC HH, MHK HOME G.",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
},
{
"CLAIM_NUMBER": "23E002114300",
"RECEIVED_DATE": "01/30/2024",
"PROCESSED_DATE": "01/16/2024",
"PAID_DATE": "01/15/2024",
"CLAIM_DOS_BEGIN_DT": "01/12/2024",
"CLAIM_DOS_END_DT": "01/12/2024",
"CONTRACT_ID": "H0354",
"PBP": "028",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "ZEN, MAX",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
}
]
},
"RECORDS_003": {
"RECORD_TYPE": "003",
"DATA": [
{
"CLAIM_NUMBER": "23E002113200",
"CLAIM_LINE_NUMBER": "2",
"CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
"CLAIMLN_DOS_END_DT": "08/29/2023",
"BILLING_CODE": "96401",
"BILLING_CODE_DESC": "Complex medication injected",
"LINE_NOTES_CODE": "B",
"TOTAL_CHARGE": "475.00",
"ALLOWED": "0.00",
"PAID_AMT": "0.00",
"PATIENT_RESPONSIBILITY": "0.00",
"DEDUCTIBLE": "0.00",
"COINSURANCE": "0.00",
"COINSURANCE_PCT": "0.00",
"COPAY": "0.00",
"CLAIM_STATUS_DENIED": "Y",
"OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "23E002113200",
"CLAIM_LINE_NUMBER": "4",
"CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
"CLAIMLN_DOS_END_DT": "08/29/2023",
"BILLING_CODE": "96401",
"BILLING_CODE_DESC": "Complex medication injected",
"LINE_NOTES_CODE": "B",
"TOTAL_CHARGE": "475.00",
"ALLOWED": "0.00",
"PAID_AMT": "0.00",
"PATIENT_RESPONSIBILITY": "0.00",
"DEDUCTIBLE": "0.00",
"COINSURANCE": "0.00",
"COINSURANCE_PCT": "0.00",
"COPAY": "0.00",
"CLAIM_STATUS_DENIED": "Y",
"OVERAGE_AMOUNT": "0.00"
}
]
},
"RECORDS_004": {
"RECORD_TYPE": "004",
"DATA": [
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1800.00",
"CLAIM_ALLOWED": "900.00",
"CLAIM_PAID_AMT": "700.00",
"CLAIM_PATIENT_RESPONSIBILITY": "200.00",
"CLAIM_COPAY": "200.00",
"CLAIM_COINSURANCE": "0.00",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1000.00",
"CLAIM_ALLOWED": "335.77",
"CLAIM_PAID_AMT": "268.62",
"CLAIM_PATIENT_RESPONSIBILITY": "67.15",
"CLAIM_COPAY": "0.00",
"CLAIM_COINSURANCE": "67.15",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1900.00",
"CLAIM_ALLOWED": "0.00",
"CLAIM_PAID_AMT": "0.00",
"CLAIM_PATIENT_RESPONSIBILITY": "0.00",
"CLAIM_COPAY": "0.00",
"CLAIM_COINSURANCE": "0.00",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
}
]
},
"RECORDS_005": {
"RECORD_TYPE": "005",
"DATA": {
"PROCESSED_PERIOD_AMOUNT_CHARGED": "4700.00",
"PROCESSED_PERIOD_ALLOWED_AMT": "1235.77",
"PROCESSED_PERIOD_PAID_AMT": "968.62",
"PROCESSED_PERIOD_PATIENT_RESPONSIBILITY": "267.15"
}
},
"RECORDS_006": {
"RECORD_TYPE": "006",
"DATA": {
"YEAR_BEGIN_DT": "01/01/2024",
"YEAR_END_DT": "12/31/2024",
"START_DATE": "01/01/2024",
"END_DATE": "11/30/2024"
}
}
}
]
当所有 json 文件都加载到单个数据帧中时,数据帧看起来像下面的格式 -
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| RECORDS_001| RECORDS_002| RECORDS_003| RECORDS_004| RECORDS_005| RECORDS_006| RECORDS_HEADER|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{{01/01/2023, 11/...|{[{08/29/2023, 08...|{[{0.00, 96401, C...|{[{900.00, 0.00, ...|{{1235.77, 4700.0...|{{B, 11/30/2024, ...|{{1, 3, 6, 3, 1, ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
现在我需要对嵌套 json 列下的每一行进行多次验证。
所以我创建了下面的表达式列表 -
["CASE WHEN RECORDS_001.DATA.MEMBER_ID = '' OR RECORDS_001.DATA.MEMBER_ID IS NULL THEN 'MEMBER_ID is missing' ELSE NULL END",
"CASE WHEN RECORDS_001.DATA.MBR_PREF_LANG = '' OR RECORDS_001.DATA.MBR_PREF_LANG IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'MBR_PREF_LANG is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_001.DATA.MBR_PREF_LARGE_PRINT = '' OR RECORDS_001.DATA.MBR_PREF_LARGE_PRINT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'MBR_PREF_LARGE_PRINT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_005.DATA.PROCESSED_PERIOD_AMOUNT_CHARGED = '' OR RECORDS_005.DATA.PROCESSED_PERIOD_AMOUNT_CHARGED IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PROCESSED_PERIOD_AMOUNT_CHARGED is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_005.DATA.PROCESSED_PERIOD_ALLOWED_AMT = '' OR RECORDS_005.DATA.PROCESSED_PERIOD_ALLOWED_AMT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PROCESSED_PERIOD_ALLOWED_AMT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_005.DATA.PROCESSED_PERIOD_PAID_AMT = '' OR RECORDS_005.DATA.PROCESSED_PERIOD_PAID_AMT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PROCESSED_PERIOD_PAID_AMT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_001.DATA.BATCH_BEGIN_DATE = '' OR RECORDS_001.DATA.BATCH_BEGIN_DATE IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'BATCH_BEGIN_DATE is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_001.DATA.BATCH_END_DATE = '' OR RECORDS_001.DATA.BATCH_END_DATE IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'BATCH_END_DATE is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_006.DATA.PLAN_DEDUCTIBLE = '' OR RECORDS_006.DATA.PLAN_DEDUCTIBLE IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PLAN_DEDUCTIBLE is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_006.DATA.SERVICE_CTG_IND = '' OR RECORDS_006.DATA.SERVICE_CTG_IND IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'SERVICE_CTG_IND is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_006.DATA.YT_ALLOWED_AMOUNT = '' OR RECORDS_006.DATA.YT_ALLOWED_AMOUNT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'YT_ALLOWED_AMOUNT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_006.DATA.YT_AMOUNT_CHARGED = '' OR RECORDS_006.DATA.YT_AMOUNT_CHARGED IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'YT_AMOUNT_CHARGED is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_006.DATA.YT_PAID_AMT = '' OR RECORDS_006.DATA.YT_PAID_AMT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'YT_PAID_AMT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_006.DATA.YT_MEMLIABILITY_AMT = '' OR RECORDS_006.DATA.YT_MEMLIABILITY_AMT IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'YT_MEMLIABILITY_AMT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN RECORDS_005.DATA.PROCESSED_PERIOD_PATIENT_RESPONSIBILITY = '' OR RECORDS_005.DATA.PROCESSED_PERIOD_PATIENT_RESPONSIBILITY IS NULL THEN concat(FAILED_VALIDATIONS, ',', 'PROCESSED_PERIOD_PATIENT_RESPONSIBILITY is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_003.DATA.TOTAL_CHARGE, x -> trim(x)), '') OR exists(RECORDS_003.DATA.TOTAL_CHARGE, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'TOTAL_CHARGE is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_003.DATA.PATIENT_RESPONSIBILITY, x -> trim(x)), '') OR exists(RECORDS_003.DATA.PATIENT_RESPONSIBILITY, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'PATIENT_RESPONSIBILITY is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_004.DATA.CLAIM_TOTAL_CHARGE, x -> trim(x)), '') OR exists(RECORDS_004.DATA.CLAIM_TOTAL_CHARGE, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_TOTAL_CHARGE is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_004.DATA.CLAIM_ALLOWED, x -> trim(x)), '') OR exists(RECORDS_004.DATA.CLAIM_ALLOWED, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_ALLOWED is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_004.DATA.CLAIM_PAID_AMT, x -> trim(x)), '') OR exists(RECORDS_004.DATA.CLAIM_PAID_AMT, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_PAID_AMT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_004.DATA.CLAIM_PATIENT_RESPONSIBILITY, x -> trim(x)), '') OR exists(RECORDS_004.DATA.CLAIM_PATIENT_RESPONSIBILITY, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_PATIENT_RESPONSIBILITY is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_002.DATA.CLAIM_DOS_BEGIN_DT, x -> trim(x)), '') OR exists(RECORDS_002.DATA.CLAIM_DOS_BEGIN_DT, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_DOS_BEGIN_DT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_002.DATA.CLAIM_DOS_END_DT, x -> trim(x)), '') OR exists(RECORDS_002.DATA.CLAIM_DOS_END_DT, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_DOS_END_DT is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_002.DATA.CLAIM_NUMBER, x -> trim(x)), '') OR exists(RECORDS_002.DATA.CLAIM_NUMBER, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'CLAIM_NUMBER is missing') ELSE FAILED_VALIDATIONS END",
"CASE WHEN array_contains(transform(RECORDS_007.DATA.PREV_PLAN_OOP_COMBINED_AMT, x -> trim(x)), '') OR exists(RECORDS_007.DATA.PREV_PLAN_OOP_COMBINED_AMT, x -> x IS NULL) THEN concat(FAILED_VALIDATIONS, ',', 'PREV_PLAN_OOP_COMBINED_AMT is missing') ELSE FAILED_VALIDATIONS END"
]
为了应用上述验证,我创建了以下方法 -
def member_records_validations(self, input_df):
input_validated_df = input_df
for validations in FIELD_VALIDATIONS:
input_validated_df = input_validated_df.withColumn('FAILED_VALIDATIONS',
expr(validations))
input_validated_df = input_validated_df.withColumn('VALIDATION_STATUS', when(
col('FAILED_VALIDATIONS').isNull(), 'PASSED').otherwise('FAILED'))
return input_validated_df
因此,在 baove 方法中,我创建新列 FAILED_VALIDATIONS 并继续添加为 null 或空的字段名称。
但是即使只针对 4 个输入 json 文件,上述方法运行时间也太长。 有人可以告诉我如何优化这个吗? 如果需要的话,我愿意回答任何进一步的问题。
谢谢!!
不要使用 withColumn 或 withColumnRenamed 进行此类转换,将所有列添加为单个选择的一部分(临时缓存等既不需要,也不具有高性能)。
查询计划看起来像是经过优化的,但事实并非如此。 基于 Scala 的方法,证据。
此外,作为警告,构建像这样的大型表达式树可能导致构建操作缓慢(甚至生成的代码大小)。这是采用各种Quality引擎的方法的原因之一。