我想使用spark执行更新和插入操作 请找到现有表格的图像参考
这里我正在更新 id :101 位置和插入时间并插入另外 2 条记录:
并使用覆盖模式写入目标
df.write.format("jdbc")
.option("url", "jdbc:mysql://localhost/test")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","temptgtUpdate")
.option("user", "root")
.option("password", "root")
.option("truncate","true")
.mode("overwrite")
.save()
执行上述命令后,我的数据已损坏,插入到数据库表中
数据框中的数据
能否请您告诉我您的观察结果和解决方案
Spark JDBC writer 支持以下模式:
错误(默认情况):如果数据已存在则抛出异常
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
由于您使用的是“覆盖”模式,因此它会根据列长度重新创建表,如果您想要自己的表定义,请先创建表并使用“追加”模式
我想使用spark执行更新和插入操作
Spark SQL 中没有与 SQL
UPDATE
语句等效的内容。 Spark SQL 中也没有等效的 SQL DELETE WHERE
语句。相反,您必须删除 Spark 外部需要更新的行,然后使用 append
模式将包含新记录和更新记录的 Spark 数据帧写入表中(以便保留表中剩余的现有行)。
如果您需要在 pyspark 代码中执行 UPSERT / DELETE 操作,我建议您使用 pymysql libary,并执行 upsert/delete 操作。请查看这篇文章以获取更多信息,并参考代码示例:Error while using INSERT INTO table ON DUPLICATE KEY, using a for循环数组
请根据您的需要修改代码示例。
我也在研究如何在不删除表的情况下实现数据加载。所以,手头只有两个选择:
我能够实现第二个,由于表定义没有改变,所以要好得多。我不确定谁会想要第一个选择。
像这样 - dataframe.write.option("overwrite", "true").option("truncate", "true").jdbc(...)
我不会推荐 TRUNCATE,因为它实际上会删除表并创建新表。执行此操作时,表可能会丢失之前设置的列级属性...因此在使用 TRUNCATE 时要小心,并确定是否可以删除表/重新创建表。
按照以下步骤操作时,更新插入逻辑工作正常
df = (spark.read.format("csv").
load("file:///C:/Users/test/Desktop/temp1/temp1.csv", header=True,
delimiter=','))
并且这样做
(df.write.format("jdbc").
option("url", "jdbc:mysql://localhost/test").
option("driver", "com.mysql.jdbc.Driver").
option("dbtable", "temptgtUpdate").
option("user", "root").
option("password", "root").
option("truncate", "true").
mode("overwrite").save())
但是,当我直接使用数据框编写时,我无法理解为什么它失败的逻辑