如果外部数据加载满足特定条件,我们会尝试更新我们的表。如果我们尝试使用我们的解决方案,代码会遇到几个错误。
输出应该更新我们的 Spark 表列,其中外部列在列记录模式下有一个空值
if initialLoad:
df = pd.DataFrame(sdf, columns = ['RBR_Name', 'RECORDMODE', 'RBR_Kennzahl'])
df['RBR_Kennzahl'] = df['RBR_Kennzahl'].astype(float)#convert datatype of 'Kennzahl'
df = df.drop(['RECORDMODE'], axis = 1)
append_records(df, dbWriteSchema, dbWriteTable)
update_last_recquest_table(dfRequests, dbWriteSchema, dbWriteTable, initialLoad)
else:
df = pd.DataFrame(sdf, columns = ['REQTSN','DATAPAKID','RECORD','RECORDMODE','RBR_NAME', 'RBR_Kennzahl'])
df['RBR_Kennzahl'] = df['RBR_Kennzahl'].astype(float)
dfAllReq = df['REQTSN'].unique()
sdfRequest = spark.read.table(dbWriteSchema+".tbl_rbr_requests")
`sdfRequest = sdfRequest.where(col("TBLNAME") == dbWriteTable)
sdfRequest.show()
lastRequest = sdfRequest.agg(min("REQTSN")).head()
lastRequest = sdfRequest.first()
dfAllReq = pd.DataFrame(dfAllReq, columns=["REQTSN"])
df['R_Kennzahl'] = df['R_Kennzahl'].astype(float)
#get list of all Requests in Change Log
dfAllReq = df['REQTSN'].unique()
#remove previously imported requests from list
sdfRequest = spark.read.table(dbWriteSchema+".tbl_rbr_requests")
sdfRequest = sdfRequest.where(col("TBLNAME") == dbWriteTable)
lastRequest = sdfRequest.agg(min("REQTSN")).head()
lastRequest = sdfRequest.first()
dfAllReq = pd.DataFrame(dfAllReq, columns=["REQTSN"])
dfNewReq = dfAllReq[ dfAllReq['REQTSN'] > lastRequest['REQTSN']]
if not dfNewReq.empty:
#update sql table for each request and recordmode
for request in dfNewReq['REQTSN']:
print("Request =" + str(request))
#insert new records (Recordmode = N)
new_df = df[(df['REQTSN'] == request) & (df['RECORDMODE'] == "N")]
new_df = new_df.drop(['REQTSN','DATAPAKID','RECORD','RECORDMODE'], axis = 1)
append_records(new_df, dbWriteSchema, dbWriteTable)
#upadte changed records (Recordmode = "")
`update_df = df[(df['REQTSN'] == request) & (df['RECORDMODE'] == "")]
#update_records_merge(update_df, dbWriteSchema, dbWriteTable)
if not update_df.empty:
update_df = update_df.drop(['REQTSN','DATAPAKID','RECORD','RECORDMODE'], axis = 1)
print(update_df)
sdf = spark.createDataFrame(update_df)
sdf.show()
dbWritePath = "abfss://[email protected]/" + dbWriteSchema + ".Lakehouse/Tables/" + dbWriteTable
matchCondition = "oldData.Name = newData.Name_n"
deltaTable = DeltaTable.forPath(spark, dbWritePath)
deltaTable.alias('oldData') \
.merge( \
sdf.alias('newData'), 'oldData.RBR_Name = newData.R_Name')\
.whenMatchedUpdate(set = {'oldData.RBR_Kennzahl': 'newData.R_Kennzahl' }) \
.execute()
错误:
AnalysisException Traceback (most recent call last)
Cell In[31], line 26
24 new_df = df[(df['REQTSN'] == request) & (df['RECORDMODE'] == "N")]
25 new_df = new_df.drop(['REQTSN','DATAPAKID','RECORD','RECORDMODE'], axis = 1)
---> 26 append_records(new_df, dbWriteSchema, dbWriteTable)
28 #upadte changed records (Recordmode = "")
29 update_df = df[(df['REQTSN'] == request) & (df['RECORDMODE'] == "")]
Cell In[10], line 40, in append_records(df, dbWriteSchema, dbTable)
38 if not df.empty:
39 sdf = spark.createDataFrame(df)
---> 40 sdf.write.mode("append").format("delta").saveAsTable(dbWriteSchema + "." + dbTable)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:1521, in DataFrameWriter.saveAsTable(self, name, format, mode, partitionBy, **options)
1519 if format is not None:
1520 self.format(format)
-> 1521 self._jwrite.saveAsTable(name)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
171 converted = convert_exception(e.java_exception)
172 if not isinstance(converted, UnknownException):
173 # Hide where the exception came from that shows a non-Pythonic
174 # JVM exception message.
--> 175 raise converted from None
176 else:
177 raise
AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 989f009d-f6d6-4787-8c12-b0edaeafd503).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.
Table schema:
root
-- RBR_Name: string (nullable = true)
-- RBR_Kennzahl: double (nullable = true)
Data schema:
root
-- R_Name: string (nullable = true)
-- R_Kennzahl: double (nullable = true)
测试一下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import *
spark = SparkSession.builder.appName("Your App Name").getOrCreate()
dbWriteSchema = "your_schema"
dbWriteTable = "your_table"
if initialLoad:
df = spark.createDataFrame(sdf.select(['RBR_Name', 'RECORDMODE', 'RBR_Kennzahl']).where(col("RECORDMODE") == ""))
df = df.withColumn("RBR_Kennzahl", df["RBR_Kennzahl"].cast("float"))
df = df.drop("RECORDMODE")
else:
df = spark.createDataFrame(sdf.select(['REQTSN', 'DATAPAKID', 'RECORD', 'RECORDMODE', 'RBR_NAME', 'RBR_Kennzahl']))
df = df.withColumn("RBR_Kennzahl", df["RBR_Kennzahl"].cast("float"))
dfAllReq = df.select("REQTSN").distinct()
sdfRequest = spark.read.table(f"{dbWriteSchema}.tbl_rbr_requests").where(col("TBLNAME") == dbWriteTable)
lastRequest = sdfRequest.agg({"REQTSN": "min"}).collect()[0][0]
dfNewReq = dfAllReq.filter(dfAllReq["REQTSN"] > lastRequest)
if dfNewReq.count() > 0:
for request in dfNewReq.collect():
request_id = request["REQTSN"]
new_df = df.filter((df["REQTSN"] == request_id) & (df["RECORDMODE"] == "N"))
if new_df.count() > 0:
new_df = new_df.drop('REQTSN', 'DATAPAKID', 'RECORD', 'RECORDMODE')
append_records(new_df, dbWriteSchema, dbWriteTable)
update_df = df.filter((df["REQTSN"] == request_id) & (df["RECORDMODE"] == ""))
if update_df.count() > 0:
update_df = update_df.drop('REQTSN', 'DATAPAKID', 'RECORD', 'RECORDMODE')
dbWritePath = f"abfss://[email protected]/{dbWriteSchema}.Lakehouse/Tables/{dbWriteTable}"
deltaTable = DeltaTable.forPath(spark, dbWritePath)
deltaTable.alias('oldData').merge(
update_df.alias('newData'),
'oldData.RBR_Name = newData.RBR_Name'
).whenMatchedUpdate(set={
'oldData.RBR_Kennzahl': col('newData.RBR_Kennzahl')
}).execute()