Airflow:如何将 XML 文件加载到 BigQuery?

问题描述 投票:0回答:2

我正在尝试编写一个 Airflow DAG,它将使用 Python 方法将

.XML
文件加载到 BigQuery,但我相信它需要转换为
.json
才能工作。

我编写了以下代码将

.XML
文件转换为
.json
,但我不确定这是否正确,以及如何编写 Python 方法来实现此目的:

import json, xmltodict

def load_xml_to_bq():
    
with open("xml_file.xml") as xml_file:
    data_dict = xmltodict.parse(xml_file.read())
    xml_file.close()
json_data = json.dumps(data_dict)
with open("data.json", "w") as json_file:
        json_file.write(json_data)
        json_file.close()

此外,是否需要从 GCS 存储桶加载

.XML
文件才能将其插入 BigQuery?

此外,是否需要将以下代码添加到我的函数中才能将

.XML
插入 BigQuery?

    client = bigquery.Client()
    client.insert_rows_json(f'{dataset}.{table}', dec_sale_list)

谢谢 - 任何有关如何实现此目标的帮助都会有所帮助;我觉得我有一些正确的概念,但我不确定需要添加/删除什么才能做到这一点。

python google-cloud-platform google-bigquery google-cloud-functions airflow
2个回答
1
投票

您也可以使用以下2种解决方案。

解决方案1:

  • PythonOperator
    使用纯
    Python
    代码和一个名为
    xmltodict
  • 的库将 xml 文件转换为 json
  • 然后使用
    GCSToBigqueryOperator
    文件中的
    Json
    Bigquery
dags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'

def transform_xml_file_to_json():
    import json, xmltodict
    from google.cloud import storage

    with open(xml_file_path_gcs) as xml_file:
        data_dict = xmltodict.parse(xml_file.read())
        xml_file.close()

    json_data = json.dumps(data_dict)

    client = storage.Client()
    bucket = client.get_bucket("your-bucket")
    blob = bucket.blob('your_file.json')
    blob.upload_from_string(data=json.dumps(json_data), content_type='application/json')

with airflow.DAG(
        "dag",
        default_args=default_dag_args,
        schedule_interval=None) as dag:

    xml_to_json = PythonOperator(
        task_id="transform_xml_to_json",
        python_callable=transform_xml_file_to_json
    )

    gcs_to_bq = GCSToBigQueryOperator(
       task_id="gcs_to_bq",
       bucket="your-bucket",
       source_objects=['*.json'],
       destination_project_dataset_table="your_table",
       schema_object=f"dags/schema/creditsafe/{data_type}.json",
       source_format="NEWLINE_DELIMITED_JSON",
    ....,
   )

   xml_to_json >> gcs_to_bq

解决方案2:

所有工作都在

PythonOperator

  • 加载 XML 文件
  • 将其转换为
    Dicts
  • 列表
  • 使用
    Python
    客户端和
    insert_rows
    方法
dags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'

def load_xml_file_to_bq():
    import json, xmltodict
    from google.cloud import bigquery

    with open(xml_file_path_gcs) as xml_file:
        data_dicts = xmltodict.parse(xml_file.read())
        xml_file.close()
    
    # Check if you have to transform data_dicts to the expected list of Dict. A Dict need to match exactly with the schema of the BQ table
    client = bigquery.Client()
    client.insert_rows_json('dataset.table', data_dicts)


with airflow.DAG(
        "dag",
        default_args=default_dag_args,
        schedule_interval=None) as dag:

    load_xml_file_to_bq = PythonOperator(
        task_id="load_xml_file_to_bq",
        python_callable=load_xml_file_to_bq
    )

    load_xml_file_to_bq

检查是否必须将

data_dicts
转换为预期的
Dict
列表。列表中的
Dict
需要与
BigQuery
表的架构完全匹配。

你要小心,如果你的xml文件太大,不建议在节点中加载重元素。


0
投票

您可以使用一些在线工具来验证从 xml 生成的 JSON 输出看起来是否正常(https://www.utilities-online.info/xmltojson

例如下面的xml

<?xml version="1.0"?>
 <customers>
 <customer id="55000">
    <name>Charter Group</name>
    <address>
        <street>100 Main</street>
        <city>Framingham</city>
        <state>MA</state>
        <zip>01701</zip>
    </address>
    <address>
        <street>720 Prospect</street>
        <city>Framingham</city>
        <state>MA</state>
        <zip>01701</zip>
    </address>
    <address>
        <street>120 Ridge</street>
        <state>MA</state>
        <zip>01760</zip>
    </address>
</customer>
</customers>

将生成这种格式的 JSON

{
"customers": {
    "customer": {
    "-id": "55000",
    "name": "Charter Group",
    "address": [
        {
        "street": "100 Main",
        "city": "Framingham",
        "state": "MA",
        "zip": "01701"
        },
        {
        "street": "720 Prospect",
        "city": "Framingham",
        "state": "MA",
        "zip": "01701"
        },
        {
        "street": "120 Ridge",
        "state": "MA",
        "zip": "01760"
        }
    ]
    }
}
}

您可以通过多种方法在 Python 中进行 xml 到 Json 的转换。您可以使用 xmltodict、xml.etree.ElementTree、minidom。不同库的代码性能会有所不同。如果它是一个相对较小的 xml 文件,那么使用 xmltodict 就可以了。如果 Xml 文件太大,那么您需要考虑从计算引擎运行 Python 脚本,因为转换过程将占用大量资源。为此,您可以使用 Airflow Google Compute Engine Operators (https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/operator/gcp/compute.html) 来启动、停止 GCE 实例.

您可以在Python脚本中指定xml文件路径(如果文件存在于GCS中,则该路径将成为文件所在的GCS路径)。如果您使用 Airflow,则可以使用 GoogleCloudStorageToBigQueryOperator (https://airflow.readthedocs.io/en/1.10.7/howto/operator/gcp/gcs.html) 将生成的 Json 加载到 BigQuery

总结一下

  • 将文件从 GCS 加载到 python 脚本中
  • 将生成的 JSON 文件转储到 GCS,或使用可用的 BQ Python API 将生成的 Json 数据(Python 变量中的数据)从 Python 实用脚本直接上传到 BigQuery
  • 如果要将生成的 JSON 文件保存到 GCS,请使用 GoogleCloudStorageToBigQueryOperator 将数据加载到 BigQuery 中

我将分享一个可以使用的 Python 代码片段(用于 xml 到 JSON 转换的基本 Python 脚本)

    import json
    import xmltodict
    
    with open("xml_file.xml") as xml_file:
    
    data_dict = xmltodict.parse(xml_file.read())
    json_data = json.dumps(data_dict)
    
    with open("data.json", "w") as json_file:
        json_file.write(json_data)
        json_file.close()

© www.soinside.com 2019 - 2024. All rights reserved.