我需要使用 Spark 将 Map(键值对)保存在一列中。要求是其他人可能会使用PIG等其他工具来使用数据,因此最好使用通用格式而不是特殊格式的字符串来保存Map。我使用以下代码创建该列:
StructField("cMap", DataTypes.createMapType(StringType, StringType), true) ::
创建数据框后,我得到了架构:
|-- cMap: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
然后我将数据帧保存为 Json:
df.write.json(path)
我发现Json输出是:
"cMap":{"1":"a","2":"b","3":"c"}
所以下次我从文件中读取它时:
val new_df = sqlContext.read.json(path)
我得到了架构:
|-- cMap: struct (nullable = true)
| |-- 1: string
| |-- 2: string
| |-- 3: string
是否有任何有效的方法可以在 Json 中保存和读取地图而无需额外处理(我可以将地图保存到一个特殊的字符串中并对其进行解码,但我认为它不应该那么复杂)。谢谢。
您可以将表格另存为
parquet
文件
写:
df.write.parquet(“mydf.parquet”)
阅读
val new_df = Spark.read.parquet("mydf.parquet")
// Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show()
Parquet
格式应该可以解决您遇到的问题。 Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression
只需将其保存到
Parquet
,如下所示
df.write.mode(SaveMode.Overwrite).parquet("path to the output")
并阅读如下
val new_df = sqlContext.read.parquet("path to the above output")
我希望这有帮助
我是这样做的:
df.withColumn("map_string_string_column", from_json(col("map_string_string_column").cast("string"), MapType(StringType, StringType)))
.withColumn("json_string", to_json($"map_string_string_column"))
然后我使用jdbc存储它
df.write
.format("jdbc")
.option("driver", "mydriver"))
.option("user", "myuser")
.option("password", "mypass")
.option("url", "jdbc:postgresql://.../mydb")
.option("dbtable", "mydbtable")
.option("stringtype", "unspecified")
.mode("append")
.save()
我首先解析要映射的字符串,然后映射到字符串的原因是为了正确解析字段。如果没有它,它会存储一些二进制文件而不是解析的字符串。
如果您的输入已经是地图,则不需要
from_json
部分。