我正在尝试编写一个 Airflow DAG,它将使用 Python 方法将
.XML
文件加载到 BigQuery,但我相信它需要转换为 .json
才能工作。
我编写了以下代码将
.XML
文件转换为 .json
,但我不确定这是否正确,以及如何编写 Python 方法来实现此目的:
import json, xmltodict
def load_xml_to_bq():
with open("xml_file.xml") as xml_file:
data_dict = xmltodict.parse(xml_file.read())
xml_file.close()
json_data = json.dumps(data_dict)
with open("data.json", "w") as json_file:
json_file.write(json_data)
json_file.close()
此外,是否需要从 GCS 存储桶加载
.XML
文件才能将其插入 BigQuery?
此外,是否需要将以下代码添加到我的函数中才能将
.XML
插入 BigQuery?
client = bigquery.Client()
client.insert_rows_json(f'{dataset}.{table}', dec_sale_list)
谢谢 - 任何有关如何实现此目标的帮助都会有所帮助;我觉得我有一些正确的概念,但我不确定需要添加/删除什么才能做到这一点。
您也可以使用以下2种解决方案。
解决方案1:
PythonOperator
使用纯 Python
代码和一个名为 xmltodict
GCSToBigqueryOperator
文件中的 Json
到 Bigquery
dags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'
def transform_xml_file_to_json():
import json, xmltodict
from google.cloud import storage
with open(xml_file_path_gcs) as xml_file:
data_dict = xmltodict.parse(xml_file.read())
xml_file.close()
json_data = json.dumps(data_dict)
client = storage.Client()
bucket = client.get_bucket("your-bucket")
blob = bucket.blob('your_file.json')
blob.upload_from_string(data=json.dumps(json_data), content_type='application/json')
with airflow.DAG(
"dag",
default_args=default_dag_args,
schedule_interval=None) as dag:
xml_to_json = PythonOperator(
task_id="transform_xml_to_json",
python_callable=transform_xml_file_to_json
)
gcs_to_bq = GCSToBigQueryOperator(
task_id="gcs_to_bq",
bucket="your-bucket",
source_objects=['*.json'],
destination_project_dataset_table="your_table",
schema_object=f"dags/schema/creditsafe/{data_type}.json",
source_format="NEWLINE_DELIMITED_JSON",
....,
)
xml_to_json >> gcs_to_bq
Python 客户端将 json 文件上传到
GCS
BigQuery
解决方案2:
所有工作都在
PythonOperator
:
Dicts
Python
客户端和insert_rows
方法dags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'
def load_xml_file_to_bq():
import json, xmltodict
from google.cloud import bigquery
with open(xml_file_path_gcs) as xml_file:
data_dicts = xmltodict.parse(xml_file.read())
xml_file.close()
# Check if you have to transform data_dicts to the expected list of Dict. A Dict need to match exactly with the schema of the BQ table
client = bigquery.Client()
client.insert_rows_json('dataset.table', data_dicts)
with airflow.DAG(
"dag",
default_args=default_dag_args,
schedule_interval=None) as dag:
load_xml_file_to_bq = PythonOperator(
task_id="load_xml_file_to_bq",
python_callable=load_xml_file_to_bq
)
load_xml_file_to_bq
检查是否必须将
data_dicts
转换为预期的 Dict
列表。列表中的 Dict
需要与 BigQuery
表的架构完全匹配。
你要小心,如果你的xml文件太大,不建议在节点中加载重元素。
您可以使用一些在线工具来验证从 xml 生成的 JSON 输出看起来是否正常(https://www.utilities-online.info/xmltojson)
例如下面的xml
<?xml version="1.0"?>
<customers>
<customer id="55000">
<name>Charter Group</name>
<address>
<street>100 Main</street>
<city>Framingham</city>
<state>MA</state>
<zip>01701</zip>
</address>
<address>
<street>720 Prospect</street>
<city>Framingham</city>
<state>MA</state>
<zip>01701</zip>
</address>
<address>
<street>120 Ridge</street>
<state>MA</state>
<zip>01760</zip>
</address>
</customer>
</customers>
将生成这种格式的 JSON
{
"customers": {
"customer": {
"-id": "55000",
"name": "Charter Group",
"address": [
{
"street": "100 Main",
"city": "Framingham",
"state": "MA",
"zip": "01701"
},
{
"street": "720 Prospect",
"city": "Framingham",
"state": "MA",
"zip": "01701"
},
{
"street": "120 Ridge",
"state": "MA",
"zip": "01760"
}
]
}
}
}
您可以通过多种方法在 Python 中进行 xml 到 Json 的转换。您可以使用 xmltodict、xml.etree.ElementTree、minidom。不同库的代码性能会有所不同。如果它是一个相对较小的 xml 文件,那么使用 xmltodict 就可以了。如果 Xml 文件太大,那么您需要考虑从计算引擎运行 Python 脚本,因为转换过程将占用大量资源。为此,您可以使用 Airflow Google Compute Engine Operators (https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/operator/gcp/compute.html) 来启动、停止 GCE 实例.
您可以在Python脚本中指定xml文件路径(如果文件存在于GCS中,则该路径将成为文件所在的GCS路径)。如果您使用 Airflow,则可以使用 GoogleCloudStorageToBigQueryOperator (https://airflow.readthedocs.io/en/1.10.7/howto/operator/gcp/gcs.html) 将生成的 Json 加载到 BigQuery
总结一下
我将分享一个可以使用的 Python 代码片段(用于 xml 到 JSON 转换的基本 Python 脚本)
import json
import xmltodict
with open("xml_file.xml") as xml_file:
data_dict = xmltodict.parse(xml_file.read())
json_data = json.dumps(data_dict)
with open("data.json", "w") as json_file:
json_file.write(json_data)
json_file.close()