如果条件为真则合并 PySpark 表

问题描述 投票:0回答:1

如果外部数据加载满足特定条件,我们会尝试更新我们的表。如果我们尝试使用我们的解决方案,代码会遇到几个错误。

输出应该更新我们的 Spark 表列,其中外部列在列记录模式下有一个空值

if initialLoad:
    df = pd.DataFrame(sdf, columns = ['RBR_Name', 'RECORDMODE', 'RBR_Kennzahl'])
    df['RBR_Kennzahl'] =  df['RBR_Kennzahl'].astype(float)#convert datatype of 'Kennzahl'
    df = df.drop(['RECORDMODE'], axis = 1)

append_records(df, dbWriteSchema, dbWriteTable)

update_last_recquest_table(dfRequests, dbWriteSchema, dbWriteTable, initialLoad)
else:

df = pd.DataFrame(sdf, columns = ['REQTSN','DATAPAKID','RECORD','RECORDMODE','RBR_NAME', 'RBR_Kennzahl'])
    df['RBR_Kennzahl'] =  df['RBR_Kennzahl'].astype(float)

dfAllReq = df['REQTSN'].unique()


sdfRequest = spark.read.table(dbWriteSchema+".tbl_rbr_requests")
    `sdfRequest = sdfRequest.where(col("TBLNAME") == dbWriteTable)
    sdfRequest.show()
    lastRequest = sdfRequest.agg(min("REQTSN")).head()
    lastRequest = sdfRequest.first()

 dfAllReq = pd.DataFrame(dfAllReq, columns=["REQTSN"])

df['R_Kennzahl'] =  df['R_Kennzahl'].astype(float)

#get list of all Requests in Change Log
dfAllReq = df['REQTSN'].unique()
#remove previously imported requests from list
sdfRequest = spark.read.table(dbWriteSchema+".tbl_rbr_requests")
sdfRequest = sdfRequest.where(col("TBLNAME") == dbWriteTable)
lastRequest = sdfRequest.agg(min("REQTSN")).head()
lastRequest = sdfRequest.first()
dfAllReq = pd.DataFrame(dfAllReq, columns=["REQTSN"])
dfNewReq = dfAllReq[ dfAllReq['REQTSN'] > lastRequest['REQTSN']]
if not dfNewReq.empty:
 #update sql table for each request and recordmode
    for request in dfNewReq['REQTSN']:
        print("Request =" + str(request))

 #insert new records (Recordmode = N)
        new_df = df[(df['REQTSN'] == request) & (df['RECORDMODE'] == "N")]
        new_df = new_df.drop(['REQTSN','DATAPAKID','RECORD','RECORDMODE'], axis = 1)
        append_records(new_df, dbWriteSchema, dbWriteTable)
#upadte changed records (Recordmode = "") 
        `update_df = df[(df['REQTSN'] == request) & (df['RECORDMODE'] == "")]
#update_records_merge(update_df, dbWriteSchema, dbWriteTable)
        if not update_df.empty:
            update_df = update_df.drop(['REQTSN','DATAPAKID','RECORD','RECORDMODE'], axis = 1)
            print(update_df)
            sdf = spark.createDataFrame(update_df)
            sdf.show()
 dbWritePath = "abfss://[email protected]/" + dbWriteSchema + ".Lakehouse/Tables/" + dbWriteTable
matchCondition = "oldData.Name = newData.Name_n"
deltaTable = DeltaTable.forPath(spark, dbWritePath)
            deltaTable.alias('oldData') \
                .merge( \
                    sdf.alias('newData'), 'oldData.RBR_Name = newData.R_Name')\
                    .whenMatchedUpdate(set = {'oldData.RBR_Kennzahl': 'newData.R_Kennzahl' }) \
                .execute()

错误:

AnalysisException                         Traceback (most recent call last)
Cell In[31], line 26
     24 new_df = df[(df['REQTSN'] == request) & (df['RECORDMODE'] == "N")]
     25 new_df = new_df.drop(['REQTSN','DATAPAKID','RECORD','RECORDMODE'], axis = 1)
---> 26 append_records(new_df, dbWriteSchema, dbWriteTable)
     28 #upadte changed records (Recordmode = "") 
     29 update_df = df[(df['REQTSN'] == request) & (df['RECORDMODE'] == "")]

Cell In[10], line 40, in append_records(df, dbWriteSchema, dbTable)
     38 if not df.empty:
     39     sdf = spark.createDataFrame(df)
---> 40     sdf.write.mode("append").format("delta").saveAsTable(dbWriteSchema + "." + dbTable)

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:1521, in DataFrameWriter.saveAsTable(self, name, format, mode, partitionBy, **options)
   1519 if format is not None:
   1520     self.format(format)
-> 1521 self._jwrite.saveAsTable(name)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
    171 converted = convert_exception(e.java_exception)
    172 if not isinstance(converted, UnknownException):
    173     # Hide where the exception came from that shows a non-Pythonic
    174     # JVM exception message.
--> 175     raise converted from None
    176 else:
    177     raise

AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 989f009d-f6d6-4787-8c12-b0edaeafd503).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- RBR_Name: string (nullable = true)
-- RBR_Kennzahl: double (nullable = true)


Data schema:
root
-- R_Name: string (nullable = true)
-- R_Kennzahl: double (nullable = true)
python pyspark
1个回答
0
投票

测试一下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import *
spark = SparkSession.builder.appName("Your App Name").getOrCreate()

dbWriteSchema = "your_schema"
dbWriteTable = "your_table"

if initialLoad:
    df = spark.createDataFrame(sdf.select(['RBR_Name', 'RECORDMODE', 'RBR_Kennzahl']).where(col("RECORDMODE") == ""))
    df = df.withColumn("RBR_Kennzahl", df["RBR_Kennzahl"].cast("float"))
    df = df.drop("RECORDMODE")
else:
    df = spark.createDataFrame(sdf.select(['REQTSN', 'DATAPAKID', 'RECORD', 'RECORDMODE', 'RBR_NAME', 'RBR_Kennzahl']))
    df = df.withColumn("RBR_Kennzahl", df["RBR_Kennzahl"].cast("float"))

dfAllReq = df.select("REQTSN").distinct()

sdfRequest = spark.read.table(f"{dbWriteSchema}.tbl_rbr_requests").where(col("TBLNAME") == dbWriteTable)
lastRequest = sdfRequest.agg({"REQTSN": "min"}).collect()[0][0]

dfNewReq = dfAllReq.filter(dfAllReq["REQTSN"] > lastRequest)

if dfNewReq.count() > 0:
    for request in dfNewReq.collect():
        request_id = request["REQTSN"]
        new_df = df.filter((df["REQTSN"] == request_id) & (df["RECORDMODE"] == "N"))
        if new_df.count() > 0:
            new_df = new_df.drop('REQTSN', 'DATAPAKID', 'RECORD', 'RECORDMODE')
            append_records(new_df, dbWriteSchema, dbWriteTable)

        update_df = df.filter((df["REQTSN"] == request_id) & (df["RECORDMODE"] == ""))
        if update_df.count() > 0:
            update_df = update_df.drop('REQTSN', 'DATAPAKID', 'RECORD', 'RECORDMODE')
            dbWritePath = f"abfss://[email protected]/{dbWriteSchema}.Lakehouse/Tables/{dbWriteTable}"
            deltaTable = DeltaTable.forPath(spark, dbWritePath)
            deltaTable.alias('oldData').merge(
                update_df.alias('newData'),
                'oldData.RBR_Name = newData.RBR_Name'
            ).whenMatchedUpdate(set={
                'oldData.RBR_Kennzahl': col('newData.RBR_Kennzahl')
            }).execute()
© www.soinside.com 2019 - 2024. All rights reserved.