批量从Dataframe插入到DB,忽略Pyspark中的失败行

问题描述 投票:1回答:2

我试图使用JDBC写入向Postgres插入spark DF。 postgres表对其中一列有唯一约束,当要插入的df违反约束时,整个批次被拒绝,火花会话关闭,给出错误重复键值违反唯一约束,这是正确的,因为数据是重复的(已经存在)在数据库中)org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148

需要插入不违反约束的数据行并忽略失败的行,而不会使整个批处理失败。

使用的代码是:

mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"} 
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)

我怎样才能做到这一点?

postgresql jdbc pyspark
2个回答
1
投票

不幸的是,Spark没有开箱即用的解决方案。我看到了许多可能的解决方案:

  1. 在PostgreSQL数据库中实现冲突解决的业务逻辑,作为forEachPartition函数的一部分。例如,捕获约束违例的异常,然后报告给日志。
  2. 删除PostgreSQL数据库上的约束,使用自动生成的PK意味着允许在数据库中存储重复的行。重复数据删除逻辑可以进一步实现为每个SQL查询的一部分或每天/每小时运行重复数据删除。你可以看到例子here
  3. 如果除了您的Spark作业之外没有其他系统或进程写入PostgreSQL表,则可以使用join操作进行过滤,以便在spark.write类似于this之前从Spark Dataframe中删除所有现有行

我希望我的想法会有所帮助。


0
投票

如果您对目标有唯一约束,那么这是不可能的。目前没有这些技术的UPSert模式。你需要围绕这个方面进行设计。

© www.soinside.com 2019 - 2024. All rights reserved.