Pyspark 在加载到文件时将数组字符串转换为数组

问题描述 投票:0回答:1

我有一个数据框,其截断版本如下所示

+--------------------+--------------------+--------------------+-----------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     input_file_name|              ROW_ID|  FAILED_VALIDATIONS|VALIDATION_STATUS|  MEMBER_ID|      RECORDS_HEADER|         RECORDS_001|         RECORDS_002|         RECORDS_003|         RECORDS_004|         RECORDS_005|         RECORDS_006|         RECORDS_007|            batch_id|
+--------------------+--------------------+--------------------+-----------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|s3://gov-solution...|a64fbbea-7cf3-43e...|                null|           PASSED|U7742139901|[{"RECORD_TYPE":"...|[{"BATCH_BEGIN_DA...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|d0f87fd5-fca1-4e2...|
|s3://gov-solution...|a64fbbea-7cf3-43e...|MEMBER_ID is missing|           FAILED|           |[{"RECORD_TYPE":"...|[{"BATCH_BEGIN_DA...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|d0f87fd5-fca1-4e2...|
|s3://gov-solution...|a64fbbea-7cf3-43e...|MEMBER_ID is miss...|           FAILED|           |[{"RECORD_TYPE":"...|[{"BATCH_BEGIN_DA...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|d0f87fd5-fca1-4e2...|
|s3://gov-solution...|a64fbbea-7cf3-43e...|                null|           PASSED|U6881487301|[{"RECORD_TYPE":"...|[{"BATCH_BEGIN_DA...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|[{"RECORD_TYPE":"...|                null|d0f87fd5-fca1-4e2...|
+--------------------+--------------------+--------------------+-----------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

当我使用 foreach() 将每条记录加载到 S3 文件时,它会加载为以下格式 -

{
    "input_file_name": "s3://gov-solutions-dev-fulfillment2-files/Input-Data/EOBLetter/20240327/MONTHLY_EOB_U6881487301_20231218180516.json",
    "ROW_ID": "1f6df8f3-7cb7-4a29-999c-ea8f5923168a",
    "FAILED_VALIDATIONS": null,
    "VALIDATION_STATUS": "PASSED",
    "MEMBER_ID": "U6881487301",
    "RECORDS_HEADER": "[{\"RECORD_TYPE\":\"HEADER\",\"COUNT_OF_OBJECTS_RECORD_TYPE_001\":1,\"COUNT_OF_OBJECTS_RECORD_TYPE_002\":3,\"COUNT_OF_OBJECTS_RECORD_TYPE_003\":6,\"COUNT_OF_OBJECTS_RECORD_TYPE_004\":3,\"COUNT_OF_OBJECTS_RECORD_TYPE_005\":1,\"COUNT_OF_OBJECTS_RECORD_TYPE_006\":1,\"COUNT_OF_OBJECTS_RECORD_TYPE_007\":0}]",
    "RECORDS_001": "[{\"BATCH_BEGIN_DATE\":\"01/01/2023\",\"BATCH_END_DATE\":\"11/30/2023\",\"BATCH_ID\":1000000078,\"BATCH_RUN_DATE\":\"12/18/2023\",\"BATCH_STATUS\":\"COMPLETE\",\"COLATERAL_TYPE\":\"EOB\",\"INDIV_ID\":\"3003790\",\"MBR_PREF_LANG\":\"English\",\"MBR_PREF_LARGE_PRINT\":\"N\",\"MEMBER_ID\":\"U6881487301\",\"SOURCE_SYSTEM_ID\":\"GBSF\"}]",
    "RECORDS_002": "[{\"RECORD_TYPE\":\"002\",\"CLAIM_DOS_BEGIN_DT\":\"08/29/2023\",\"CLAIM_DOS_END_DT\":\"08/29/2023\",\"CLAIM_NUMBER\":\"23E002113200\",\"CLAIM_QMB_ELIG\":\"Y\",\"CLAIM_TYP\":\"H\",\"CLASS_ID\":\"\",\"CLASS_PLAN_ID\":\"\",\"CONTRACT_ID\":\"H0354\",\"GROUP_ID\":\"\",\"PAID_DATE\":\"09/15/2023\",\"PAR_IND\":\"N\",\"PBP\":\"027\",\"PROCESSED_DATE\":\"09/16/2023\",\"RECEIVED_DATE\":\"08/30/2023\",\"RENDERING_PROVIDER_NAME\":\"ESCHLER, DAVID J\",\"SEGMENT\":\"000\",\"PAR_IND_1\":\"Out-Of-Network\",\"CLAIM_QMB_ELIG_1\":\"Y\",\"ClaimLines_ServiceDataRange\":\"08/29/2023 - 08/29/2023\"},{\"RECORD_TYPE\":\"002\",\"CLAIM_DOS_BEGIN_DT\":\"05/25/2023\",\"CLAIM_DOS_END_DT\":\"05/25/2023\",\"CLAIM_NUMBER\":\"23P001125500\",\"CLAIM_QMB_ELIG\":\"N\",\"CLAIM_TYP\":\"H\",\"CLASS_ID\":\"\",\"CLASS_PLAN_ID\":\"\",\"CONTRACT_ID\":\"H0354\",\"GROUP_ID\":\"\",\"PAID_DATE\":\"06/15/2023\",\"PAR_IND\":\"Y\",\"PBP\":\"001\",\"PROCESSED_DATE\":\"06/16/2023\",\"RECEIVED_DATE\":\"05/30/2023\",\"RENDERING_PROVIDER_NAME\":\"PRAC HH, MHK HOME G.\",\"SEGMENT\":\"000\",\"PAR_IND_1\":\"In-Network\",\"CLAIM_QMB_ELIG_1\":\"\",\"ClaimLines_ServiceDataRange\":\"05/25/2023 - 05/25/2023\"},{\"RECORD_TYPE\":\"002\",\"CLAIM_DOS_BEGIN_DT\":\"01/12/2024\",\"CLAIM_DOS_END_DT\":\"01/12/2024\",\"CLAIM_NUMBER\":\"23E002114300\",\"CLAIM_QMB_ELIG\":\"N\",\"CLAIM_TYP\":\"H\",\"CLASS_ID\":\"\",\"CLASS_PLAN_ID\":\"\",\"CONTRACT_ID\":\"H0354\",\"GROUP_ID\":\"\",\"PAID_DATE\":\"01/15/2024\",\"PAR_IND\":\"Y\",\"PBP\":\"028\",\"PROCESSED_DATE\":\"01/16/2024\",\"RECEIVED_DATE\":\"01/30/2024\",\"RENDERING_PROVIDER_NAME\":\"ZEN, MAX\",\"SEGMENT\":\"000\",\"PAR_IND_1\":\"In-Network\",\"CLAIM_QMB_ELIG_1\":\"\",\"ClaimLines_ServiceDataRange\":\"01/12/2024 - 01/12/2024\"}]",
    "RECORDS_003": "[{\"RECORD_TYPE\":\"003\",\"ALLOWED\":\"0.00\",\"BILLING_CODE\":\"96401\",\"BILLING_CODE_DESC\":\"Complex medication injected\",\"CLAIMLN_DOS_BEGIN_DT\":\"08/29/2023\",\"CLAIMLN_DOS_END_DT\":\"08/29/2023\",\"CLAIM_LINE_NUMBER\":\"2\",\"CLAIM_NUMBER\":\"23E002113200\",\"CLAIM_STATUS_DENIED\":\"Y\",\"COINSURANCE\":\"$10.00\",\"COINSURANCE_PCT\":\"0.00\",\"COPAY\":\"Y\",\"DEDUCTIBLE\":\"$910.00\",\"LINE_NOTES_CODE\":\"B\",\"OVERAGE_AMOUNT\":\"$0.00\",\"PAID_AMT\":\"0.00\",\"PATIENT_RESPONSIBILITY\":\"$0.00\",\"TOTAL_CHARGE\":\"$475.00\",\"DEDUCTIBLE_STATEMENT\":\"Y\",\"COINSURANCE_STATEMENT\":\"Y\",\"OVERAGE_STATEMENT\":\"N\"},{\"RECORD_TYPE\":\"003\",\"ALLOWED\":\"0.00\",\"BILLING_CODE\":\"96401\",\"BILLING_CODE_DESC\":\"Complex medication injected\",\"CLAIMLN_DOS_BEGIN_DT\":\"08/29/2023\",\"CLAIMLN_DOS_END_DT\":\"08/29/2023\",\"CLAIM_LINE_NUMBER\":\"4\",\"CLAIM_NUMBER\":\"23E002113200\",\"CLAIM_STATUS_DENIED\":\"Y\",\"COINSURANCE\":\"$0.00\",\"COINSURANCE_PCT\":\"0.00\",\"COPAY\":\"N\",\"DEDUCTIBLE\":\"$0.00\",\"LINE_NOTES_CODE\":\"B\",\"OVERAGE_AMOUNT\":\"$0.00\",\"PAID_AMT\":\"0.00\",\"PATIENT_RESPONSIBILITY\":\"$0.00\",\"TOTAL_CHARGE\":\"$475.00\",\"DEDUCTIBLE_STATEMENT\":\"N\",\"COINSURANCE_STATEMENT\":\"N\",\"OVERAGE_STATEMENT\":\"N\"},{\"RECORD_TYPE\":\"003\",\"ALLOWED\":\"900.00\",\"BILLING_CODE\":\"A0428\",\"BILLING_CODE_DESC\":\"Ambulance transport\",\"CLAIMLN_DOS_BEGIN_DT\":\"01/12/2024\",\"CLAIMLN_DOS_END_DT\":\"01/12/2024\",\"CLAIM_LINE_NUMBER\":\"1\",\"CLAIM_NUMBER\":\"23E002114300\",\"CLAIM_STATUS_DENIED\":\"N\",\"COINSURANCE\":\"$0.00\",\"COINSURANCE_PCT\":\"0.00\",\"COPAY\":\"Y\",\"DEDUCTIBLE\":\"$0.00\",\"LINE_NOTES_CODE\":\"\",\"OVERAGE_AMOUNT\":\"$0.00\",\"PAID_AMT\":\"700.00\",\"PATIENT_RESPONSIBILITY\":\"$200.00\",\"TOTAL_CHARGE\":\"$1800.00\",\"DEDUCTIBLE_STATEMENT\":\"N\",\"COINSURANCE_STATEMENT\":\"N\",\"OVERAGE_STATEMENT\":\"N\"},{\"RECORD_TYPE\":\"003\",\"ALLOWED\":\"0.00\",\"BILLING_CODE\":\"96401\",\"BILLING_CODE_DESC\":\"Complex medication injected\",\"CLAIMLN_DOS_BEGIN_DT\":\"08/29/2023\",\"CLAIMLN_DOS_END_DT\":\"08/29/2023\",\"CLAIM_LINE_NUMBER\":\"1\",\"CLAIM_NUMBER\":\"23E002113200\",\"CLAIM_STATUS_DENIED\":\"Y\",\"COINSURANCE\":\"$0.00\",\"COINSURANCE_PCT\":\"0.00\",\"COPAY\":\"N\",\"DEDUCTIBLE\":\"$0.00\",\"LINE_NOTES_CODE\":\"B\",\"OVERAGE_AMOUNT\":\"$0.00\",\"PAID_AMT\":\"0.00\",\"PATIENT_RESPONSIBILITY\":\"$0.00\",\"TOTAL_CHARGE\":\"$475.00\",\"DEDUCTIBLE_STATEMENT\":\"N\",\"COINSURANCE_STATEMENT\":\"N\",\"OVERAGE_STATEMENT\":\"N\"},{\"RECORD_TYPE\":\"003\",\"ALLOWED\":\"0.00\",\"BILLING_CODE\":\"96401\",\"BILLING_CODE_DESC\":\"Complex medication injected\",\"CLAIMLN_DOS_BEGIN_DT\":\"08/29/2023\",\"CLAIMLN_DOS_END_DT\":\"08/29/2023\",\"CLAIM_LINE_NUMBER\":\"3\",\"CLAIM_NUMBER\":\"23E002113200\",\"CLAIM_STATUS_DENIED\":\"Y\",\"COINSURANCE\":\"$0.00\",\"COINSURANCE_PCT\":\"0.00\",\"COPAY\":\"N\",\"DEDUCTIBLE\":\"$0.00\",\"LINE_NOTES_CODE\":\"B\",\"OVERAGE_AMOUNT\":\"$40.00\",\"PAID_AMT\":\"0.00\",\"PATIENT_RESPONSIBILITY\":\"$0.00\",\"TOTAL_CHARGE\":\"$475.00\",\"DEDUCTIBLE_STATEMENT\":\"N\",\"COINSURANCE_STATEMENT\":\"N\",\"OVERAGE_STATEMENT\":\"Y\"},{\"RECORD_TYPE\":\"003\",\"ALLOWED\":\"335.77\",\"BILLING_CODE\":\"G6015\",\"BILLING_CODE_DESC\":\"Radiation treatment (IMRT)\",\"CLAIMLN_DOS_BEGIN_DT\":\"05/25/2023\",\"CLAIMLN_DOS_END_DT\":\"05/25/2023\",\"CLAIM_LINE_NUMBER\":\"1\",\"CLAIM_NUMBER\":\"23P001125500\",\"CLAIM_STATUS_DENIED\":\"N\",\"COINSURANCE\":\"$67.15\",\"COINSURANCE_PCT\":\"20.00\",\"COPAY\":\"N\",\"DEDUCTIBLE\":\"$0.00\",\"LINE_NOTES_CODE\":\"\",\"OVERAGE_AMOUNT\":\"$0.00\",\"PAID_AMT\":\"268.62\",\"PATIENT_RESPONSIBILITY\":\"$67.15\",\"TOTAL_CHARGE\":\"$1000.00\",\"DEDUCTIBLE_STATEMENT\":\"N\",\"COINSURANCE_STATEMENT\":\"Y\",\"OVERAGE_STATEMENT\":\"N\"}]",
    "RECORDS_004": "[{\"RECORD_TYPE\":\"004\",\"CLAIM_ALLOWED\":\"900.00\",\"CLAIM_COINSURANCE\":\"0.00\",\"CLAIM_COPAY\":\"200.00\",\"CLAIM_DEDUCTIBLE\":\"0.00\",\"CLAIM_NUMBER\":\"23E002114300\",\"CLAIM_OVERAGE_AMOUNT\":\"0.00\",\"CLAIM_PAID_AMT\":\"$700.00\",\"CLAIM_PATIENT_RESPONSIBILITY\":\"200.00\",\"CLAIM_TOTAL_CHARGE\":\"1800.00\"},{\"RECORD_TYPE\":\"004\",\"CLAIM_ALLOWED\":\"335.77\",\"CLAIM_COINSURANCE\":\"67.15\",\"CLAIM_COPAY\":\"0.00\",\"CLAIM_DEDUCTIBLE\":\"0.00\",\"CLAIM_NUMBER\":\"23P001125500\",\"CLAIM_OVERAGE_AMOUNT\":\"0.00\",\"CLAIM_PAID_AMT\":\"$268.62\",\"CLAIM_PATIENT_RESPONSIBILITY\":\"67.15\",\"CLAIM_TOTAL_CHARGE\":\"1000.00\"},{\"RECORD_TYPE\":\"004\",\"CLAIM_ALLOWED\":\"0.00\",\"CLAIM_COINSURANCE\":\"0.00\",\"CLAIM_COPAY\":\"0.00\",\"CLAIM_DEDUCTIBLE\":\"0.00\",\"CLAIM_NUMBER\":\"23E002113200\",\"CLAIM_OVERAGE_AMOUNT\":\"0.00\",\"CLAIM_PAID_AMT\":\"$0.00\",\"CLAIM_PATIENT_RESPONSIBILITY\":\"0.00\",\"CLAIM_TOTAL_CHARGE\":\"1900.00\"}]",
    "RECORDS_005": "[{\"RECORD_TYPE\":\"005\",\"PROCESSED_PERIOD_ALLOWED_AMT\":\"1235.77\",\"PROCESSED_PERIOD_AMOUNT_CHARGED\":\"4700.00\",\"PROCESSED_PERIOD_PAID_AMT\":\"968.62\",\"PROCESSED_PERIOD_PATIENT_RESPONSIBILITY\":\"267.15\"}]",
    "RECORDS_006": "[{\"RECORD_TYPE\":\"006\",\"CLAIM_NOTE_CODE\":\"B\",\"END_DATE\":\"11/30/2024\",\"MAX_SERVICE_CTG_DEDUCTIBLE\":\"$0.00\",\"MEM_PAID_DEDUCTIBLE\":\"$0.00\",\"MEM_PAID_MOOP_COMBINED\":\"0.00\",\"MEM_PAID_MOOP_INNETWRK\":\"267.15\",\"MOOP_MAX_COMBINED\":\"0.00\",\"MOOP_MAX_INNETWRK\":\"3100.00\",\"PAID_TOWARDS_SERVICE_CTG\":\"$0.00\",\"PLAN_COINSURANCE\":\"Y\",\"PLAN_COPAY\":\"Y\",\"PLAN_DEDUCTIBLE\":\"$0.00\",\"PLAN_YEAR\":\"2024\",\"PLAN_YEAR_BEGIN_DT\":\"01/01/2024\",\"PLAN_YEAR_END_DT\":\"12/31/2024\",\"SERVICE_CTG_IND\":\"N\",\"SERVICE_CTG_TYPE\":\"\",\"START_DATE\":\"01/01/2024\",\"YT_ALLOWED_AMOUNT\":\"1235.77\",\"YT_AMOUNT_CHARGED\":\"4700.00\",\"YT_MEMLIABILITY_AMT\":\"$267.15\",\"YT_PAID_AMT\":\"968.62\",\"PLAN_DEDUCTIBLE_BAR_GRAPH\":\"N\"}]",
    "RECORDS_007": null,
    "batch_id": "302bdcc8-dec1-445a-86a6-0409d3959b75"
}

我想要的是在加载到文件时,json 字符串值应该转换为以 RECORDS_* 开头的字段的 json

我有以下函数,我使用 foreach() 运行将每一行加载到 S3 文件 -

def write_final_json(row):
    print("in write_final_json")
    if row.VALIDATION_STATUS == 'PASSED':
        # Initialize S3 client
        s3 = boto3.client('s3')
    
        # S3 bucket and directory
        bucket_name = 'gov-solutions-dev-fulfillment2-files'
        output_dir = 'Extracted-Data/EOBLetter/Ready'
        
        # Create JSON string from row
        json_data = json.dumps(row.asDict())
        
        # Define S3 key
        s3_key = f"{output_dir}/{row.batch_id}/{row.MEMBER_ID}.json"
    
        # Write JSON data to S3
        s3.put_object(Body=json_data, Bucket=bucket_name, Key=s3_key)

请让我知道在哪里可以将 json 字符串转换为 json,以便我只能在文件中看到 json

python apache-spark pyspark aws-glue
1个回答
0
投票

试试这个 -

假设您的数据位于名为

df
的变量中,首先您将获得以
RECORDS_
开头的所有列。由于您有这些列的动态架构,因此请使用
schema_of_json
获取动态架构,然后您可以使用它将字符串 JSON 转换为 JSON 对象。

cols = [x for x in df.columns if x.startswith('RECORDS_')]

for col in cols:
    data = df.select(col).head()[0]
    if data is not None:
        schema = F.schema_of_json(data)
        df = df.withColumn(col, F.from_json(F.col(col), schema))

df.write.json('s3a://...')
© www.soinside.com 2019 - 2024. All rights reserved.