我有一个场景,S3 存储桶中会有多个 json 文件,我的 Pyspark 脚本会将其加载到数据帧中。
每个 json 将具有以下结构 -
[
{
"RECORDS_HEADER": {
"RECORD_TYPE": "HEADER",
"DATA": {
"COUNT_OF_OBJECTS_RECORD_TYPE_001": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_002": 3,
"COUNT_OF_OBJECTS_RECORD_TYPE_003": 6,
"COUNT_OF_OBJECTS_RECORD_TYPE_004": 3,
"COUNT_OF_OBJECTS_RECORD_TYPE_005": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_006": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_007": 0
}
},
"RECORDS_001": {
"RECORD_TYPE": "001",
"DATA": {
"MEMBER_ID": "U3652928373",
"INDIV_ID": "3003790",
"MBR_PREF_LANG": "English",
"MBR_PREF_LARGE_PRINT": "N",
"BATCH_BEGIN_DATE": "01/01/2023",
"BATCH_END_DATE": "11/30/2023",
"BATCH_RUN_DATE": "12/18/2023",
"BATCH_ID": 1000000078,
"BATCH_STATUS": "COMPLETE",
"COLATERAL_TYPE": "EOB"
}
},
"RECORDS_002": {
"RECORD_TYPE": "002",
"DATA": [
{
"CLAIM_NUMBER": "232423113200",
"RECEIVED_DATE": "08/30/2023",
"PROCESSED_DATE": "09/16/2023",
"PAID_DATE": "09/15/2023",
"CLAIM_DOS_BEGIN_DT": "08/29/2023",
"CLAIM_DOS_END_DT": "08/29/2023",
"CONTRACT_ID": "H0354",
"PBP": "027",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "ESCHLER, DAVID J",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
},
{
"CLAIM_NUMBER": "23P001125500",
"RECEIVED_DATE": "05/30/2023",
"PROCESSED_DATE": "06/16/2023",
"PAID_DATE": "06/15/2023",
"CLAIM_DOS_BEGIN_DT": "05/25/2023",
"CLAIM_DOS_END_DT": "05/25/2023",
"CONTRACT_ID": "H0354",
"PBP": "001",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "PRAC HH, MHK HOME G.",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
},
{
"CLAIM_NUMBER": "23E002114300",
"RECEIVED_DATE": "01/30/2024",
"PROCESSED_DATE": "01/16/2024",
"PAID_DATE": "01/15/2024",
"CLAIM_DOS_BEGIN_DT": "01/12/2024",
"CLAIM_DOS_END_DT": "01/12/2024",
"CONTRACT_ID": "H0354",
"PBP": "028",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "ZEN, MAX",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
}
]
},
"RECORDS_003": {
"RECORD_TYPE": "003",
"DATA": [
{
"CLAIM_NUMBER": "23E002113200",
"CLAIM_LINE_NUMBER": "2",
"CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
"CLAIMLN_DOS_END_DT": "08/29/2023",
"BILLING_CODE": "96401",
"BILLING_CODE_DESC": "Complex medication injected",
"LINE_NOTES_CODE": "B",
"TOTAL_CHARGE": "475.00",
"ALLOWED": "0.00",
"PAID_AMT": "0.00",
"PATIENT_RESPONSIBILITY": "0.00",
"DEDUCTIBLE": "0.00",
"COINSURANCE": "0.00",
"COINSURANCE_PCT": "0.00",
"COPAY": "0.00",
"CLAIM_STATUS_DENIED": "Y",
"OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "23E002113200",
"CLAIM_LINE_NUMBER": "4",
"CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
"CLAIMLN_DOS_END_DT": "08/29/2023",
"BILLING_CODE": "96401",
"BILLING_CODE_DESC": "Complex medication injected",
"LINE_NOTES_CODE": "B",
"TOTAL_CHARGE": "475.00",
"ALLOWED": "0.00",
"PAID_AMT": "0.00",
"PATIENT_RESPONSIBILITY": "0.00",
"DEDUCTIBLE": "0.00",
"COINSURANCE": "0.00",
"COINSURANCE_PCT": "0.00",
"COPAY": "0.00",
"CLAIM_STATUS_DENIED": "Y",
"OVERAGE_AMOUNT": "0.00"
}
]
},
"RECORDS_004": {
"RECORD_TYPE": "004",
"DATA": [
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1800.00",
"CLAIM_ALLOWED": "900.00",
"CLAIM_PAID_AMT": "700.00",
"CLAIM_PATIENT_RESPONSIBILITY": "200.00",
"CLAIM_COPAY": "200.00",
"CLAIM_COINSURANCE": "0.00",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1000.00",
"CLAIM_ALLOWED": "335.77",
"CLAIM_PAID_AMT": "268.62",
"CLAIM_PATIENT_RESPONSIBILITY": "67.15",
"CLAIM_COPAY": "0.00",
"CLAIM_COINSURANCE": "67.15",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1900.00",
"CLAIM_ALLOWED": "0.00",
"CLAIM_PAID_AMT": "0.00",
"CLAIM_PATIENT_RESPONSIBILITY": "0.00",
"CLAIM_COPAY": "0.00",
"CLAIM_COINSURANCE": "0.00",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
}
]
},
"RECORDS_005": {
"RECORD_TYPE": "005",
"DATA": {
"PROCESSED_PERIOD_AMOUNT_CHARGED": "4700.00",
"PROCESSED_PERIOD_ALLOWED_AMT": "1235.77",
"PROCESSED_PERIOD_PAID_AMT": "968.62",
"PROCESSED_PERIOD_PATIENT_RESPONSIBILITY": "267.15"
}
},
"RECORDS_006": {
"RECORD_TYPE": "006",
"DATA": {
"YEAR_BEGIN_DT": "01/01/2024",
"YEAR_END_DT": "12/31/2024",
"START_DATE": "01/01/2024",
"END_DATE": "11/30/2024"
}
}
}
]
当所有 json 文件都加载到单个数据帧中时,数据帧看起来像下面的格式 -
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| RECORDS_001| RECORDS_002| RECORDS_003| RECORDS_004| RECORDS_005| RECORDS_006| RECORDS_HEADER|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{{01/01/2023, 11/...|{[{08/29/2023, 08...|{[{0.00, 96401, C...|{[{900.00, 0.00, ...|{{1235.77, 4700.0...|{{B, 11/30/2024, ...|{{1, 3, 6, 3, 1, ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
现在我需要在嵌套 json 中进行转换,就像 RECORDS_002 中的示例之一一样,我需要更改同一行中所有记录以及所有行的 PROCESSED_DATE 字段的日期格式。同样,不同的嵌套 json 字段还需要其他转换。
请让我知道如何对 PySpark 数据框中的嵌套 json 字段执行转换,而不会使过程太慢。
如果需要,我愿意回答任何进一步的问题。
谢谢!!
看看这个:
import pyspark.sql.functions as f
df = (
spark.read.option('multiline', True).json('./test_json.json')
.withColumn('RECORDS002_PROCESSED_DATE_transformed', f.expr('transform(RECORDS_002.DATA.PROCESSED_DATE, x -> date_format(to_timestamp(x, "MM/dd/yyyy"), "yyyy-MM-dd"))'))
.select('RECORDS_002.DATA.PROCESSED_DATE', 'RECORDS002_PROCESSED_DATE_transformed')
)
输出为:
+------------------------------------+-------------------------------------+
|PROCESSED_DATE |RECORDS002_PROCESSED_DATE_transformed|
+------------------------------------+-------------------------------------+
|[09/16/2023, 06/16/2023, 01/16/2024]|[2023-09-16, 2023-06-16, 2024-01-16] |
+------------------------------------+-------------------------------------+