我正在 apache Spark 中创建一个增量表, 这是我的代码
import pyspark
from pyspark.sql import SparkSession
from delta import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Create a Spark session with Delta Lake support
spark = SparkSession.builder \
.appName("DeltaTutorial") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
data = [("Robert", "Baratheon", "Baratheon", "Storms End", 48),
("Eddard", "Stark", "Stark", "Winterfell", 46),
("Jamie", "Lannister", "Lannister", "Casterly Rock", 29)
]
schema = StructType([
StructField("firstname", StringType(), True),
StructField("lastname", StringType(), True),
StructField("house", StringType(), True),
StructField("location", StringType(), True),
StructField("age", IntegerType(), True)
])
sample_dataframe = spark.createDataFrame(data=data, schema=schema)
sample_dataframe.write.mode(saveMode="overwrite").format("delta").save("F:\\CODE\\output")
将数据帧保存到增量表时,我收到以下错误
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o33.applySchemaToPythonRDD.
: java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:64)
at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:57)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:572)
at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:877)
at org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(SparkSession.scala:862)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
任何人都可以建议我如何解决这个错误... 我的spark版本是3.5.0 当我将数据帧数据写入镶木地板文件时,我没有收到任何错误, 当我将数据写入增量表时,我遇到问题...... 请告诉我为什么会收到此错误,并让我知道是否可以使用 apache Spark 创建增量表...
您使用的 Spark 和 Delta Lake 版本可能不匹配。以下是兼容版本的列表:
Delta Lake版本 | Apache Spark 版本 |
---|---|
3.1.x | 3.5.x |
3.0.x | 3.5.x |
2.4.x | 3.4.x |
2.3.x | 3.3.x |
2.2.x | 3.3.x |
就您而言,对于 Spark 3.5,您的 Delta Lake 版本应为 3.1 或 3.0。 完整的文档/表格可以在这里找到:https://docs.delta.io/latest/releases.html