如何使用spark scala将包含多个行标记的复杂xml文件加载到数据框中并将其另存为表(注意通用解决方案)

问题描述 投票:0回答:1

(这是一个带有2个行标记的示例xml文件(任何方法将此加载到具有n个行标记的数据框或使用spark scala中的xpath的元素)

<book id="0">
    <author>Matthew</author>
    <publish_date>Sun Oct 01 00:00:00 EDT 2000</publish_date>
    <description>An in-depth look at creating applications with XML. 
    </description>
 <price id = "1">   
    <price>44.95</price>
    <genre>Computer</genre>
    <title>XML Developer's Guide</title>
</price>
</book>
xml scala apache-spark-sql bigdata xpath-2.0
1个回答
2
投票
  1. 您可以为以上xml文件创建架构,如下所示。 val innerSchema = StructType( StructField("price", ArrayType( StructType( StructField("price",LongType,true):: StructField("genre",StringType,true)::Nil StructField("title",StringType,true)::Nil ) ),true)::Nil ) val schema = StructType( StructField("author",StringType,true):: StructField("publish_date", StringType, nullable = true), StructField("description", StringType, nullable = true), StructField("price", innerSchema, true):: Nil )
  2. 应用此架构以读取xml文件 val df = spark.sqlContext.read.format("com.databricks.spark.xml") .option("rowTag", "Item") .schema(schema) .load(xmlFile) //Selecy nested field and explode to get the flattern result .select("author", "publish_date", "description","price.*") // select required column 你可以让推断架构本身的火花得到相同的结果 val df = spark.sqlContext.read.format("com.databricks.spark.xml") .option("rowTag", "Item") //.schema(schema) .load(xmlFile) .select("author", "publish_date", "description","price.*") // select required column
  3. 将数据保存到Hive表。 df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
  4. 即使您可以将DF保存到数据库,如下所示。 //create properties object val prop = new java.util.Properties prop.setProperty("driver", "com.mysql.jdbc.Driver") prop.setProperty("user", "root") prop.setProperty("password", "pw") //jdbc mysql url - destination database is named "data" val url = "jdbc:mysql://localhost:3306/data" //destination database table val table = "sample_data_table" //write data from spark dataframe to database df.write.mode("append").jdbc(url, table, prop)

更新:

将DataFrame保存为csv

  1. 你可以使用databricks spark-csv。 https://github.com/databricks/spark-csv df.write.format("com.databricks.spark.csv").save(filepath)
  2. 使用Spark 2.x时,不需要spark-csv包,因为它包含在Spark中。 df.write.format("csv").save(filepath)

请参考https://github.com/databricks/spark-xml。希望能帮助到你!

© www.soinside.com 2019 - 2024. All rights reserved.