如何使用scala将Map保存到Spark中的Json?

问题描述 投票:0回答:3

我需要使用 Spark 将 Map(键值对)保存在一列中。要求是其他人可能会使用PIG等其他工具来使用数据,因此最好使用通用格式而不是特殊格式的字符串来保存Map。我使用以下代码创建该列:

StructField("cMap", DataTypes.createMapType(StringType, StringType), true) ::

创建数据框后,我得到了架构:

|-- cMap: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)

然后我将数据帧保存为 Json:

df.write.json(path)

我发现Json输出是:

"cMap":{"1":"a","2":"b","3":"c"}

所以下次我从文件中读取它时:

val new_df = sqlContext.read.json(path)

我得到了架构:

|-- cMap: struct (nullable = true)
|    |-- 1: string
|    |-- 2: string
|    |-- 3: string

是否有任何有效的方法可以在 Json 中保存和读取地图而无需额外处理(我可以将地图保存到一个特殊的字符串中并对其进行解码,但我认为它不应该那么复杂)。谢谢。

java sql json scala apache-spark
3个回答
0
投票

您可以将表格另存为

parquet
文件

  • 写:

    df.write.parquet(“mydf.parquet”)

  • 阅读

    val new_df = Spark.read.parquet("mydf.parquet")

spark 指南保存模式

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()

0
投票

Parquet
格式应该可以解决您遇到的问题。
Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression

只需将其保存到

Parquet
,如下所示

df.write.mode(SaveMode.Overwrite).parquet("path to the output")

并阅读如下

val new_df = sqlContext.read.parquet("path to the above output")

我希望这有帮助


0
投票

我是这样做的:

df.withColumn("map_string_string_column", from_json(col("map_string_string_column").cast("string"), MapType(StringType, StringType)))
.withColumn("json_string", to_json($"map_string_string_column"))

然后我使用jdbc存储它

df.write
        .format("jdbc")
        .option("driver", "mydriver"))
        .option("user", "myuser")
        .option("password", "mypass")
        .option("url", "jdbc:postgresql://.../mydb")
        .option("dbtable", "mydbtable")
        .option("stringtype", "unspecified")
        .mode("append")
        .save()

我首先解析要映射的字符串,然后映射到字符串的原因是为了正确解析字段。如果没有它,它会存储一些二进制文件而不是解析的字符串。

如果您的输入已经是地图,则不需要

from_json
部分。

© www.soinside.com 2019 - 2024. All rights reserved.