这对我来说是一个真正的谜:当我尝试使用 jdbc 将 pyspark df 写入 Azure dataframe 时,我遇到了一个奇怪的情况。在运行“写入”功能时,我的表在没有任何原因的情况下以某种方式发生了更改,并向Azure发送了错误的数据。然后它用相同的错误数据保存我的 pyspark df 。这是我编写的部分代码:
print(sparkDF_cleaned.show())
sparkDF_cleaned.write \
.format("jdbc") \
.mode("overwrite")
.option("url", jdbcUrl) \
.option("dbtable", "dbo.upsert_test") \
.option("user", jdbcUsername) \
.option("password", jdbcPassword) \
.save()
print(f"data loaded to table {db_table_name}")
print(sparkDF_cleaned.show())
接下来输出:
sparkDF_cleaned :
+------------+---+-----+----------+----------------------+
| id_date| id|value| _date|datetime_of_extraction|
+------------+---+-----+----------+----------------------+
|1 2022-05-01| 1| 17|2022-05-01| 2022-06-01|
|1 2022-05-06| 1| 6|2022-05-06| 2022-06-13|
|2 2022-05-02| 2| 10|2022-05-02| 2022-06-01|
|3 2022-05-03| 3| 15|2022-05-03| 2022-06-01|
+------------+---+-----+----------+----------------------+
data loaded to table upsert_test
sparkDF_cleaned :
+------------+---+-----+----------+----------------------+
| id_date| id|value| _date|datetime_of_extraction|
+------------+---+-----+----------+----------------------+
|1 2022-05-06| 1| 6|2022-05-06| 2022-06-13|
|2 2022-05-02| 2| 5|2022-05-02| 2022-06-13|
+------------+---+-----+----------+----------------------+
Azure 表接收第二个表中的数据。亲爱的大学,为什么会发生这种情况? 感谢您提前抽出时间。
使用 pyspark jdbc 与 pyspark 转换表覆盖时,我仍然无法避免问题。
但是,我决定将 pyspark 表写入本地文件,从文件中读取保存的数据,然后使用 pyspark jdbc 覆盖功能发布数据。
这似乎不是最好的解决方案,但它确实有效。如果有更好的解决方案,将很乐意阅读。
path = "*DataBricksWorkspace*/temp_table.json"
dbutils.fs.rm(path, True)
sparkDF.write.json(path)
sparkDF_copy = spark.read.json(file)
sparkDF_copy.write.jdbc(url=jdbcUrl,
table=db_table_name,
mode="overwrite",
properties=connectionProperties)
这是一个很晚的回复,但没有其他线程提到这一点,我们遇到了同样的问题,并深入探讨了导致此问题的原因以及如何以最有效的方式解决它。
我们认为,当使用 jdbc 连接从 sql 数据库读取数据,然后写入同一个 sql 数据库时,会出现该问题(可能该问题也可能由不同方法的组合引起)。对数据库的突变在上传之前应用于数据帧。因此,当将数据附加到 sql db 时,数据会附加到 pyspark 数据帧,然后写入。这是不希望有的效果,因为您将数据两次添加到 db。同样,覆盖时它将删除 pyspark 数据帧,然后上传一个空的 pyspark 数据帧,从而生成一个空数据库。当读取和写入之间发生突变时,甚至会发生更有趣的事情,因为这些突变在写入期间发生在 df 上两次。
无论如何,为什么会发生这种情况仍然是一个谜。但我们怀疑这是 pyspark 多核处理、pyspark 惰性求值方法和对同一 sql 数据库的双连接(因为读写)之间的组合。
因此,为了解决这个问题,您必须以某种方式将读取数据帧的引用与您正在写入的数据帧断开连接,从而导致惰性评估一路返回。您的方法有效,因为它完全阻止 pyspark 理解您试图从您尝试读取的数据帧中写回该数据帧的方式。另一种有效的方法是将 pyspark df 转换为 pandas 然后再传回。
我们解决这个问题的最简单有效的方法是同时使用 .persist() 和 .count()。第一个从该点开始缓存数据帧,第二个触发一个操作,以便 pyspark 被迫对数据帧进行计算。我们选择这两个是因为我们认为它们的计算强度最小。