如何将spark中的json字符串插入到postgres中jsonb类型的列中

问题描述 投票:0回答:1

我必须将 Spark (Scala) 中的 json 字符串插入 Postgres 中 JSONB 类型的列中。我必须单独计算该字符串,然后将该值作为新列添加到数据框中。所以使用 df.withColumn 添加如下

df.withColumn("transaction_count", lit(transactionCount)) 

以下是 transactionCount 的示例值

{"OFFLINE": {"CHEQUE": 5}, "ONLINE": {"CREDIT": 135, "DEBIT": 297}}

但是插入 Postgres 时出现以下错误

Caused by: org.postgresql.util.PSQLException: ERROR: column "transaction_count" is of type jsonb but expression is of type character varying

我还尝试将 transactionCount 转换为 JsObject (使用 io.spray)依赖项。但它抛出了不同的错误

ERROR - org.apache.spark.SparkRuntimeException: The feature is not supported: literal for '{"ADHOC":{"AHP":5},"ADJUSTMENT":{"ADJ":7},"ROYALTY":{"WEA":135,"WEP":297}}' of class spray.json.JsObject.

知道如何从 Spark 插入 Postgres 中的 JSONB 列。下面是用于插入 Postgres 表的示例代码

-- table in postgres
CREATE TABLE jsonb_table (
  id           INTEGER,
  sample_name  VARCHAR(50),
  json_data    JSONB
);


val st = """{"OFFLINE": {"CHEQUE": 5}, "ONLINE": {"CREDIT": 135, "DEBIT": 297}}"""
val jsonData = Seq((1, "name1"), (2, "name2"))
val jsonDF = spark.createDataFrame(jsonData).toDF("id", "sample_name")
val dfWithJSONB = jsonDF.withColumn("json_data", lit(st))

val jdbcUrl = "jdbc:postgresql://localhost:5432/db"

dfWithJSONB.write
      .format("jdbc")
      .option("url", jdbcUrl)
      .option("user", "postgres")
      .option("password", "password")
      .option("dbtable",  "jsonb_table")
      .option("driver", "org.postgresql.Driver")
      .mode("append")
      .save()


谢谢

postgresql scala apache-spark pyspark apache-spark-sql
1个回答
0
投票

我是这样做的:

df.withColumn("map_string_string_column", from_json(col("map_string_string_column").cast("string"), MapType(StringType, StringType)))
.withColumn("json_string", to_json($"map_string_string_column"))

然后我使用jdbc存储它:

df.write
        .format("jdbc")
        .option("driver", "mydriver"))
        .option("user", "myuser")
        .option("password", "mypass")
        .option("url", "jdbc:postgresql://.../mydb")
        .option("dbtable", "mydbtable")
        .option("stringtype", "unspecified")
        .mode("append")
        .save()

我首先解析要映射的字符串,然后映射到字符串的原因是为了正确解析字段。如果没有它,它会存储一些二进制文件而不是解析的字符串。

如果您的输入已经是地图,则不需要

from_json
部分。

警告: 将其存储到数据库后,检查是否有空值。如果解析失败,它可能会插入 null 而不会引发错误。

© www.soinside.com 2019 - 2024. All rights reserved.