我在使用 PySpark(Python) 展平 XML 文件时遇到多生成器问题。
XML 有 2 个同一级别的数组。 如果您有任何其他解决方案来扁平化此 XML,请分享。
请帮忙。
谢谢, 山姆
错误: AnalysisException: [UNSUPPORTED_GENERATOR.MULTI_GENERATOR] 不支持生成器:每个 SELECT 子句只允许一个生成器,但发现 2 个:“generatorouter(explode(Child1.Child2.Child21))”、“generatorouter(explode(Child1.Child3.Child31)) ”.
源代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType
from pyspark.sql.functions import explode_outer
def flatten(df):
f_df = df
select_expr = _explodeArrays(element=f_df.schema)
# While there is at least one Array, explode.
while "ArrayType(" in f"{f_df.schema}":
f_df=f_df.selectExpr(select_expr)
select_expr = _explodeArrays(element=f_df.schema)
# Flatten the structure
select_expr = flattenExpr(f_df.schema)
f_df = f_df.selectExpr(select_expr)
return f_df
def _explodeArrays(element, root=None):
el_type = type(element)
expr = []
try:
_path = f"{root+'.' if root else ''}{element.name}"
except AttributeError:
_path = ""
if el_type == StructType:
for t in element:
res = _explodeArrays(t, root)
expr.extend(res)
elif el_type == StructField and type(element.dataType) == ArrayType:
expr.append(f"explode_outer({_path}) as {_path.replace('.','_')}")
elif el_type == StructField and type(element.dataType) == StructType:
expr.extend(_explodeArrays(element.dataType, _path))
else:
expr.append(f"{_path} as {_path.replace('.','_')}")
return expr
def flattenExpr(element, root=None):
expr = []
el_type = type(element)
try:
_path = f"{root+'.' if root else ''}{element.name}"
except AttributeError:
_path = ""
if el_type == StructType:
for t in element:
expr.extend(flattenExpr(t, root))
elif el_type == StructField and type(element.dataType) == StructType:
expr.extend(flattenExpr(element.dataType, _path))
elif el_type == StructField and type(element.dataType) == ArrayType:
# You should use flattenArrays to be sure this will not happen
expr.extend(flattenExpr(element.dataType.elementType, f"{_path}[0]"))
else:
expr.append(f"{_path} as {_path.replace('.','_')}")
return expr
spark = SparkSession.builder.getOrCreate()
path = 'Files/Test9.xml'
df = spark.read.format('xml').options(rowTag='Body', ignoreNamespace='true').load(path)
display('******* Initial Data Frame of XML file ********')
display(df)
display('******* Initial Schema of XML file ********')
df.printSchema()
f_df = flatten(df)
display('******* Flatten Schema of XML file ********')
f_df.printSchema()
display('******* Flatten Data Frame of XML file ********')
display(f_df)
提供的不是您所要求的右侧表格,而是作为一个想法。 您可以使用 iterparse() 获取内容的 xpath:
import xml.etree.ElementTree as ET
import pandas as pd
def path(file_path):
tu = []
xpath = []
for event, elem in ET.iterparse(file_path, events=("start","end",)):
if event == "start":
xpath.append(elem.tag)
if elem.get("name") is not None:
e = ("/".join(xpath), elem.get("name"))
tu.append(e)
if event == "end":
if elem.text is not None:
t =("/".join(xpath), elem.text)
if "\n " in t[1]:
pass
else:
e = (t[0], e[1], elem.text)
tu.append(e)
xpath.pop()
return tu
tu = path("body.xml")
df = pd.DataFrame(tu, columns=["xpath", "name","text"])
print(df.to_string())
输出:
xpath name text
0 Body/Child1/Child2 ERA None
1 Body/Child1/Child2/Child21 ERA test 1
2 Body/Child1/Child2/Child21 ERA test 2
3 Body/Child1/Child3 LRA None
4 Body/Child1/Child3/Child31 LRA test 3
5 Body/Child1/Child3/Child31 LRA test 4