使用数据类型映射将数据帧写入csv 在Spark中

问题描述 投票:0回答:2

我有一个文件是file1snappy.parquet。它有一个复杂的数据结构,如地图,里面的数组。经过处理,我得到了最终的结果。当写入结果到csv我得到一些错误说

"Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support map<string,bigint> data type."

我用的代码:

val conf=new SparkConf().setAppName("student-example").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
    val datadf = sqlcontext.read.parquet("C:\\file1.snappy.parquet")
    def sumaggr=udf((aggr: Map[String, collection.mutable.WrappedArray[Long]]) => if (aggr.keySet.contains("aggr")) aggr("aggr").sum else 0)
datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)
    datadf.write.format("com.databricks.spark.csv").option("header", "true").save("C:\\myfile.csv")

我尝试转换datadf.toString(),但我仍面临同样的问题。如何将结果写入CSV。

apache-spark spark-dataframe rdd
2个回答
2
投票

Spark CSV源仅支持原子类型。您不能存储任何非原子的列

我认为最好是为map<string,bigint>作为数据类型的列创建一个JSON,并将其保存在csv中,如下所示。

import spark.implicits._ 
import org.apache.spark.sql.functions._

datadf.withColumn("column_name_with_map_type", to_json(struct($"column_name_with_map_type"))).write.csv("outputpath")

希望这可以帮助!


1
投票

您正试图保存输出

val datadf = sqlcontext.read.parquet("C:\\file1.snappy.parquet")

我猜这是一个错误,因为如果你这样做,udf函数和所有聚合都会徒劳无功

所以我认为你想保存输出

datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)

因此,您需要将其保存在新的数据帧变量中,并使用该变量进行保存。

val finalDF = datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0)
finalDF.write.format("com.databricks.spark.csv").option("header", "true").save("C:\\myfile.csv")

而你应该没事。

© www.soinside.com 2019 - 2024. All rights reserved.