我必须将 Spark (Scala) 中的 json 字符串插入 Postgres 中 JSONB 类型的列中。我必须单独计算该字符串,然后将该值作为新列添加到数据框中。所以使用 df.withColumn 添加如下
df.withColumn("transaction_count", lit(transactionCount))
以下是 transactionCount 的示例值
{"OFFLINE": {"CHEQUE": 5}, "ONLINE": {"CREDIT": 135, "DEBIT": 297}}
但是插入 Postgres 时出现以下错误
Caused by: org.postgresql.util.PSQLException: ERROR: column "transaction_count" is of type jsonb but expression is of type character varying
我还尝试将 transactionCount 转换为 JsObject (使用 io.spray)依赖项。但它抛出了不同的错误
ERROR - org.apache.spark.SparkRuntimeException: The feature is not supported: literal for '{"ADHOC":{"AHP":5},"ADJUSTMENT":{"ADJ":7},"ROYALTY":{"WEA":135,"WEP":297}}' of class spray.json.JsObject.
知道如何从 Spark 插入 Postgres 中的 JSONB 列。下面是用于插入 Postgres 表的示例代码
-- table in postgres
CREATE TABLE jsonb_table (
id INTEGER,
sample_name VARCHAR(50),
json_data JSONB
);
val st = """{"OFFLINE": {"CHEQUE": 5}, "ONLINE": {"CREDIT": 135, "DEBIT": 297}}"""
val jsonData = Seq((1, "name1"), (2, "name2"))
val jsonDF = spark.createDataFrame(jsonData).toDF("id", "sample_name")
val dfWithJSONB = jsonDF.withColumn("json_data", lit(st))
val jdbcUrl = "jdbc:postgresql://localhost:5432/db"
dfWithJSONB.write
.format("jdbc")
.option("url", jdbcUrl)
.option("user", "postgres")
.option("password", "password")
.option("dbtable", "jsonb_table")
.option("driver", "org.postgresql.Driver")
.mode("append")
.save()
谢谢
我是这样做的:
df.withColumn("map_string_string_column", from_json(col("map_string_string_column").cast("string"), MapType(StringType, StringType)))
.withColumn("json_string", to_json($"map_string_string_column"))
然后我使用jdbc存储它:
df.write
.format("jdbc")
.option("driver", "mydriver"))
.option("user", "myuser")
.option("password", "mypass")
.option("url", "jdbc:postgresql://.../mydb")
.option("dbtable", "mydbtable")
.option("stringtype", "unspecified")
.mode("append")
.save()
我首先解析要映射的字符串,然后映射到字符串的原因是为了正确解析字段。如果没有它,它会存储一些二进制文件而不是解析的字符串。
如果您的输入已经是地图,则不需要
from_json
部分。
警告: 将其存储到数据库后,检查是否有空值。如果解析失败,它可能会插入 null 而不会引发错误。