ICEBERG - MERGE INTO 在来自 docker 镜像 aws-glue-libs:glue_libs_4.0.0_image_01 的 Glue Job 4.0 中不起作用

问题描述 投票:0回答:2

我在从 docker image amazon/aws-glue-libs:glue_libs_4.0.0_image_01 运行 Glue Job 时遇到问题,而“MERGE INTOICEBERG 表。

我按照 https://aws.amazon.com/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker-container 的说明进行操作/.

我的代码包含:

  1. CREATE TABLE {my_catalog}.{glue_database}.{table_name} ... TBLPROPERTIES ('table_type'='ICEBERG', ....
    -> 有效
  2. spark.read.format("iceberg").load(f"{glue_database}.{table_name}") ....
    -> 有效
  3. spark.sql(f"""select * from {my_catalog}.{glue_database}.{table_name}""").show()
    -> 有效
  4.  my_df.createOrReplaceTempView('my_view'); 
         merge_query = f"""
             MERGE INTO {my_catalog}.{glue_database}.{table_name} t
             USING (select * from my_view) s
             ON (t.id = s.id)
             WHEN NOT MATCHED THEN
                 INSERT *
         """; 
     spark.sql(merge_query);
     ``` ->  **ERROR**
    
    

1、2 和 3 在 Docker 中工作。仅当从 Docker 运行 (4) 时才会出现此问题;从 AWS Glue 控制台运行时,代码完美适用于所有 4 个步骤。

ERROR: An error occurred while calling o45.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891)
        at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:895)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)
        at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:495)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:153)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:153)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:146)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:159)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:657)
        at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:298)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:313)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:267)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:246)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)

附加信息:

  • 在 Docker 和 AWS Glue 作业控制台中,我有:
    • AWS Glue 版本:4.0
    • 冰山版本:1.0.0
    • Spark 版本:3.3.0-amzn-1
  • 我的数据存储在 s3 上,并且 Iceberg 已为此进行配置。
    • Python 脚本中的 Iceberg 配置。
      path_warehouse = f"s3://{bucket}/iceberg/warehouse"
      spark.conf.set("spark.sql.defaultCatalog", my_catalog)
      spark.conf.set(f"spark.sql.catalog.{my_catalog}", "org.apache.iceberg.spark.SparkCatalog")
      spark.conf.set(f"spark.sql.catalog.{my_catalog}.warehouse", f"{path_warehouse}/")
      spark.conf.set(f"spark.sql.catalog.{my_catalog}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
      spark.conf.set(f"spark.sql.catalog.{my_catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
      
  • 从 Docker 中,我能够创建表并读取数据,但是在尝试合并时,它会抛出错误。从 AWS Glue 控制台一切正常。
  • 我还尝试在docker中配置glue作业:'--enable-glue-datacatalog','true'和'--datalake-formats','iceberg'。
database aws-glue apache-iceberg
2个回答
1
投票

您是否尝试过此操作https://aws.amazon.com/pt/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker -容器/Glue 4.0:添加原生数据湖库 AWS Glue 4.0 Docker 镜像支持原生数据湖库; Apache Hudi、Delta Lake 和 Apache Iceberg。 您可以传递环境变量DATALAKE_FORMATS来加载相关的JAR文件。

-e DATALAKE_FORMATS=hudi,delta,iceberg 

0
投票

您需要在 Spark 配置中添加以下内容:

SparkSession.builder.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
© www.soinside.com 2019 - 2024. All rights reserved.