我正在尝试使用 Azure 数据工厂从 API 源加载 json 数据。 我收到以下错误,whcih 是错误的 json 转义序列。
ErrorCode=JsonInvalidDataFormat,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Error occurred when deserializing source JSON file 'api/vc/report/16277292d17ac20a5d0d073d0f8fa45bf848a89e2e0c9fcebb707f187fcf9cdce88ceb70c'. Check if the data is in valid JSON object format.,Source=Microsoft.DataTransfer.Common,''Type=Newtonsoft.Json.JsonReaderException,Message=Bad JSON escape sequence: \A. Path 'Data[43999].work_order', line 1, position 39435083.,Source=Newtonsoft.Json,'
当目标数据集的编码为 UTF-8 without BOM 时,出现上述错误。
如果我在 ADF 的 dest DS 中将编码设置为 UTF-8。数据已作为 json 文件成功加载到 ADLS,但我无法读取/解析(尝试以文本和 json 格式读取)databricks/dataflow 中的 json 文件。收到类似错误(AnalysisException:自 Spark 2.3 起,当 引用的列仅包括内部损坏的记录列 (默认命名为_corrupt_record......)。
从 ADF 中的源读取时,有什么方法可以替换无效的转义序列吗? 或者有其他解决方案来处理这种情况吗?
这是样本数据
{
"Report ID": "123",
"Report Name": "JSON_1",
"Data as of Date": "2024-01-18 01:28 PM",
"Report Date": "01/18/2024 08:27 AM",
"Report Header": "",
"Report Footer": "",
"Data": [
{
"ID": "01",
"Supplier": "COMPANY",
"Type": "Standard",
"Work_ID": "fg",**"work_order":"T\A Harvey"**,
"Purchase_Order_Number": "2",
"Statement_of_Work_Status": "Approved",
"Primary Cost Object": "3 | BASE",
"Primary Cost Object Code": "10014",
"Item": "Limit ",
"Item_ID": "rg23",
"Item_Status": "R",
"S_Worker_ID": "",
"Worker_ID": "",
"Worker_Name": "",
"Master_Number": "",
"Role_Code": "",
"Role_Level": "",
"Invoice_ID": "",
"Item_Submit_Date": "12/09/2023 12:39 PM"....15 more columns
}
]
}
在将其用作 ADF 中的源之前,我尝试了以下方法,使用数据块替换转义字符
json_data = json_data.replace('\\', '')
schema = StructType([StructField("Report ID", StringType(), True),
StructField("Report Name", StringType(), True),
StructField("Data as of Date", StringType(), True),
StructField("Report Date", StringType(), True),
StructField("Report Header", StringType(), True),
StructField("Report Footer", StringType(), True),
StructField("Data", StringType(), True)])
df = spark.read.json(spark.sparkContext.parallelize([json_data]), schema=schema)
file_path = "abfss://[email protected]/exmpl.json"
df.write.mode("overwrite").json(file_path)
结果:
Report ID Report Name Data as of Date Report Date Report Header Report Footer Data
123 JSON_1 2024-01-18 01:28 PM 01/18/2024 08:27 AM [{"ID":"01","Supplier":"COMPANY","Type":"Standard","Work_ID":"fg","work_order":"TA Harvey","Purchase_Order_Number":"2","Statement_of_Work_Status":"Approved","Primary Cost Object":"3 | BASE","Primary Cost Object Code":"10014","Item":"Limit ","Item_ID":"rg23","Item_Status":"R","S_Worker_ID":"","Worker_ID":"","Worker_Name":"","Master_Number":"","Role_Code":"","Role_Level":"","Invoice_ID":"","Item_Submit_Date":"12/09/2023 12:39 PM"}]
上面的代码正在替换 JSON 数据中的转义字符。 并将 JSON 字符串转换为数据帧并写入 ADLS。
接下来我在复制活动中使用了移动 JSON 目标。
但是,Azure databricks 具有在 python 请求包中加载 json 数据的能力。
以下为范例:
import requests
resp = requests.get('https://reqres.in/api/users?page=1,name,href')
db1 = spark.sparkContext.parallelize([resp.text])
df2 = spark.read.json(db1)
requests
模块,允许Python代码发送HTTP请求并从API端点检索数据
Spark RDD(弹性分布式数据集)并行化文本
spark.read.json
从 Spark RDD
读取