Source=Microsoft.DataTransfer.Common,''Type=Newtonsoft.Json.JsonReaderException,Message=错误的 JSON 转义序列:\A

问题描述 投票:0回答:1

我正在尝试使用 Azure 数据工厂从 API 源加载 json 数据。 我收到以下错误,whcih 是错误的 json 转义序列。

ErrorCode=JsonInvalidDataFormat,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Error occurred when deserializing source JSON file 'api/vc/report/16277292d17ac20a5d0d073d0f8fa45bf848a89e2e0c9fcebb707f187fcf9cdce88ceb70c'. Check if the data is in valid JSON object format.,Source=Microsoft.DataTransfer.Common,''Type=Newtonsoft.Json.JsonReaderException,Message=Bad JSON escape sequence: \A. Path 'Data[43999].work_order', line 1, position 39435083.,Source=Newtonsoft.Json,'

当目标数据集的编码为 UTF-8 without BOM 时,出现上述错误。

如果我在 ADF 的 dest DS 中将编码设置为 UTF-8。数据已作为 json 文件成功加载到 ADLS,但我无法读取/解析(尝试以文本和 json 格式读取)databricks/dataflow 中的 json 文件。收到类似错误(AnalysisException:自 Spark 2.3 起,当 引用的列仅包括内部损坏的记录列 (默认命名为_corrupt_record......)。

从 ADF 中的源读取时,有什么方法可以替换无效的转义序列吗? 或者有其他解决方案来处理这种情况吗?

这是样本数据

{
    "Report ID": "123",
    "Report Name": "JSON_1",
    "Data as of Date": "2024-01-18 01:28 PM",
    "Report Date": "01/18/2024 08:27 AM",
    "Report Header": "",
    "Report Footer": "",
    "Data": [
        {
            "ID": "01",
            "Supplier": "COMPANY",
            "Type": "Standard",
            "Work_ID": "fg",**"work_order":"T\A Harvey"**,
            "Purchase_Order_Number": "2",
            "Statement_of_Work_Status": "Approved",
            "Primary Cost Object": "3 | BASE",
            "Primary Cost Object Code": "10014",
            "Item": "Limit ",
            "Item_ID": "rg23",
            "Item_Status": "R",
            "S_Worker_ID": "",
            "Worker_ID": "",
            "Worker_Name": "",
            "Master_Number": "",
            "Role_Code": "",
            "Role_Level": "",
            "Invoice_ID": "",
            "Item_Submit_Date": "12/09/2023 12:39 PM"....15 more columns
        }
    ]
}

apache-spark pyspark azure-data-factory databricks azure-databricks
1个回答
0
投票

在将其用作 ADF 中的源之前,我尝试了以下方法,使用数据块替换转义字符

json_data = json_data.replace('\\', '')
schema = StructType([StructField("Report ID", StringType(), True),
                     StructField("Report Name", StringType(), True),
                     StructField("Data as of Date", StringType(), True),
                     StructField("Report Date", StringType(), True),
                     StructField("Report Header", StringType(), True),
                     StructField("Report Footer", StringType(), True),
                     StructField("Data", StringType(), True)])
df = spark.read.json(spark.sparkContext.parallelize([json_data]), schema=schema)
file_path = "abfss://[email protected]/exmpl.json"
df.write.mode("overwrite").json(file_path)

结果:

Report ID   Report Name Data as of Date Report Date Report Header   Report Footer   Data
123 JSON_1  2024-01-18 01:28 PM 01/18/2024 08:27 AM         [{"ID":"01","Supplier":"COMPANY","Type":"Standard","Work_ID":"fg","work_order":"TA Harvey","Purchase_Order_Number":"2","Statement_of_Work_Status":"Approved","Primary Cost Object":"3 | BASE","Primary Cost Object Code":"10014","Item":"Limit ","Item_ID":"rg23","Item_Status":"R","S_Worker_ID":"","Worker_ID":"","Worker_Name":"","Master_Number":"","Role_Code":"","Role_Level":"","Invoice_ID":"","Item_Submit_Date":"12/09/2023 12:39 PM"}]

上面的代码正在替换 JSON 数据中的转义字符。 并将 JSON 字符串转换为数据帧并写入 ADLS。

接下来我在复制活动中使用了移动 JSON 目标。

但是,Azure databricks 具有在 python 请求包中加载 json 数据的能力。

以下为范例:

import requests
resp = requests.get('https://reqres.in/api/users?page=1,name,href')
db1 = spark.sparkContext.parallelize([resp.text])
df2 = spark.read.json(db1)

requests
模块,允许Python代码发送HTTP请求并从API端点检索数据 Spark RDD(弹性分布式数据集)并行化文本

spark.read.json   

Spark RDD

读取
© www.soinside.com 2019 - 2024. All rights reserved.