我在从 docker image amazon/aws-glue-libs:glue_libs_4.0.0_image_01 运行 Glue Job 时遇到问题,而“MERGE INTO”ICEBERG 表。
我的代码包含:
CREATE TABLE {my_catalog}.{glue_database}.{table_name} ... TBLPROPERTIES ('table_type'='ICEBERG', ....
-> 有效spark.read.format("iceberg").load(f"{glue_database}.{table_name}") ....
-> 有效spark.sql(f"""select * from {my_catalog}.{glue_database}.{table_name}""").show()
-> 有效 my_df.createOrReplaceTempView('my_view');
merge_query = f"""
MERGE INTO {my_catalog}.{glue_database}.{table_name} t
USING (select * from my_view) s
ON (t.id = s.id)
WHEN NOT MATCHED THEN
INSERT *
""";
spark.sql(merge_query);
``` -> **ERROR**
1、2 和 3 在 Docker 中工作。仅当从 Docker 运行 (4) 时才会出现此问题;从 AWS Glue 控制台运行时,代码完美适用于所有 4 个步骤。
ERROR: An error occurred while calling o45.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:895)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:495)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:153)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:153)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:146)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:159)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:657)
at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:298)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:313)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:267)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:246)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
附加信息:
path_warehouse = f"s3://{bucket}/iceberg/warehouse"
spark.conf.set("spark.sql.defaultCatalog", my_catalog)
spark.conf.set(f"spark.sql.catalog.{my_catalog}", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(f"spark.sql.catalog.{my_catalog}.warehouse", f"{path_warehouse}/")
spark.conf.set(f"spark.sql.catalog.{my_catalog}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set(f"spark.sql.catalog.{my_catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
您是否尝试过此操作https://aws.amazon.com/pt/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker -容器/: Glue 4.0:添加原生数据湖库 AWS Glue 4.0 Docker 镜像支持原生数据湖库; Apache Hudi、Delta Lake 和 Apache Iceberg。 您可以传递环境变量DATALAKE_FORMATS来加载相关的JAR文件。
-e DATALAKE_FORMATS=hudi,delta,iceberg
您需要在 Spark 配置中添加以下内容:
SparkSession.builder.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")