如何使用spark-xml在pyspark中正确读取嵌套xml?

问题描述 投票:0回答:1

我有一个如下所示的 xml 文件。

<document>
  <root dataid="2000" path="U.S. Broadline OpCos:Eastern Maryland:Documents-Standard:Finance:Accounts Receivables">
    <documentnode name="123.pdf">
      <version filepath="data.pdf" mimetype="application/pdf"></version>
      <categories>
        <category name="Customer Delivery">
          <attribute name="Invoice Number">1234</attribute>
          <attribute name="Customer Number">543</attribute>
          <attribute name="Document Type">Original Customer Invoice</attribute>
          <attribute name="Capture Date" dateformat="yyyyMMdd">20230914</attribute>
          <attribute name="Location Name">Eastern Maryland</attribute>
          <attribute name="Location Number">21</attribute>
          <attribute name="Ship to Customer Name">Jill</attribute>
          <attribute name="Bill to Customer Name">Jill</attribute>
          <attribute name="Delivery Date" dateformat="yyyyMMdd">20230909</attribute>
          <attribute name="Territory Number">ART</attribute>
          <attribute name="Manifest Number">435</attribute>
          <attribute name="Route Number">76543</attribute>
          <attribute name="Invoice Type">Priced</attribute>
        </category>
      </categories>
    </documentnode>
  </root>
</document>

下面是我读取此 xml 的代码。出于某种原因,我将 xml 读取为 Spark 数据帧,并将其转换回 pandas 数据帧。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Nested XML to DataFrame").getOrCreate()
df = spark.read.format("com.databricks.spark.xml")\
    .option("rootTag", "document") \
    .option("rowTag", "documentnode")\
    .load("test.xml")
ss_pandas_df = df.toPandas()
print(ss_pandas_df.head(1))

输出如下所示

 _name                                         categories                            version
0  123.pdf  ((Customer Delivery, [Row(_VALUE='1234', _date...  (None, data.pdf, application/pdf)

当我尝试打印类别列数据时

print(ss_pandas_df.iloc[0]['categories'])
,它看起来像这样

Row(category=Row(_name='Customer Delivery', attribute=[Row(_VALUE='1234', _dateformat=None, _name='Invoice Number'), Row(_VALUE='543', _dateformat=None, _name='Customer Number'), Row(_VALUE='Original Customer Invoice', _dateformat=None, _name='Document Type'), Row(_VALUE='20230914', _dateformat='yyyyMMdd', _name='Capture Date'), Row(_VALUE='Eastern Maryland', _dateformat=None, _name='Location Name'), Row(_VALUE='21', _dateformat=None, _name='Location Number'), Row(_VALUE='Jill', _dateformat=None, _name='Ship to Customer Name'), Row(_VALUE='Jill', _dateformat=None, _name='Bill to Customer Name'), 
Row(_VALUE='20230909', _dateformat='yyyyMMdd', _name='Delivery Date'), Row(_VALUE='ART', _dateformat=None, _name='Territory Number'), Row(_VALUE='435', _dateformat=None, _name='Manifest Number'), Row(_VALUE='76543', _dateformat=None, _name='Route Number'), Row(_VALUE='Priced', _dateformat=None, _name='Invoice Type')]))

但这不是我所期待的。我需要将以下属性作为单独的列,其列名称如发票编号/客户编号等。 我在这里缺少什么

注意:我也添加了spark-xml包

pandas pyspark aws-glue apache-spark-xml
1个回答
1
投票

我希望我能帮助你,或者至少为你指明正确的方向。

对于嵌套结构,您必须逐步溶解各层。创建数据框时,架构将显示在下面。

这里区分数组和结构体很重要。 结构体可以用“select”表达式来解析,数组可以用“explode”函数来解析。

df_categories = df.select("categories.*")

通过此“选择”,您可以明确选择“类别”列及其所有值。但请注意,您删除了所有其他列。如果您想保留它们,您也必须指定这一点。

结果将如下所示:

至此,我们已经稍微解决了整个问题,但这对我们来说仍然不够。如果我们也溶解底层结构,我们就会将更多的结构带入其中。

现在我们在最高层有一个数组,我们必须爆炸它。为此,必须事先导入“爆炸”功能。

现在我们已经到达了最低水平。您现在要做的就是将行旋转为列。

希望我能帮到你。

编辑:

您说您希望将“_name”中的值作为列,但我不知道这在这种情况下有多大用处。 您可以使用以下代码进行透视:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Hinzufügen einer Spalte mit eindeutiger Identifikation für jede Zeile
df_attributes = df_attributes.withColumn("row_id", F.monotonically_increasing_id())

# Verwende die "pivot" Funktion, um die Einträge in "_name" als Spalten zu definieren
pivot_df = df_attributes.groupBy("row_id").pivot("_name").agg(F.first("_VALUE"))

# Optional: Fehlende Werte mit 0 füllen
pivot_df = pivot_df.fillna(0)

pivot_df.display()

在我看来,直接从“_name”列访问值更有意义。

希望我能帮到你。

© www.soinside.com 2019 - 2024. All rights reserved.