在 DBT 中使用 Nessie 命令但不使用 Spark 时出现语法错误

问题描述 投票:0回答:2

我们正在尝试使用 AWS EMR(在 EC2 上)、DBT、Spark 和 Nessie 设置环境。

即使所有扩展都已正确安装,并且像“CREATE BRANCH”这样的 Nessie 命令可以从 Jupyter 和直接从 Spark 在集群上工作,但它们不能作为 DBT 的一部分工作。

常规 SQL 命令可以工作,并按预期返回响应,但在尝试创建分支或使用分支时,我收到解析错误。

我正在使用尽可能最新的版本

这是堆栈跟踪:

ERROR SparkExecuteStatementOperation: Error executing query with a1aab108-eaaa-4c48-9951-5959ca24a038, currentState RUNNING,
org.apache.spark.sql.catalyst.parser.ParseException:
Syntax error at or near 'demo_branch2'(line 3, pos 22)

== SQL ==
/* {"app": "dbt", "dbt_version": "1.5.2", "profile_name": "thrift_tests", "target_name": "dev", "node_id": "model.thrift_tests.my_first_dbt_model"} */

        use reference demo_branch2 in dev_catalog
----------------------^^^


        at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:89) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser.parsePlan(IcebergSparkSqlExtensionsParser.scala:133) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-0.jar:?]
        at org.apache.spark.sql.catalyst.parser.extensions.NessieSparkSqlExtensionsParser.parsePlan(NessieSparkSqlExtensionsParser.scala:114) ~[org.projectnessie.nessie-integrations_nessie-spark-extensions-3.3_2.12-0.51.1.jar:?]
        at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:620) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:620) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:291) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_372]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_372]
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) ~[hadoop-client-api-3.3.3-amzn-2.jar:?]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]

EMR(6.10)配置:

[
  {
    "Classification": "iceberg-defaults",
    "Properties": {
      "iceberg.enabled": "true"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.63.0",
      "spark.sql.catalog.dev_catalog": "org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.dev_catalog.auth_type": "NONE",
      "spark.sql.catalog.dev_catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog",
      "spark.sql.catalog.dev_catalog.ref": "main",
      "spark.sql.catalog.dev_catalog.uri": "https://nessie-dev.dev.XYZ.cloud/api/v1",
      "spark.sql.catalog.dev_catalog.warehouse": "s3://.../nessie_catalog/",
      "spark.sql.defaultCatalog": "dev_catalog",
      "spark.sql.execution.pyarrow.enabled": "true",
      "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions"
    }
  }
]

我尝试运行与 Nessie 无关的命令,并且成功了

我尝试为 Nessie 运行 Spark 命令,它们成功了。

Jupyter 一切都按预期运行

我尝试更改 thrift 服务器的配置以不进行验证(使用 GPT,我自己无法发现这可能是 GPT 胡编乱造的)

我尝试使用 DBT 运行 Iceberg 命令,它有效。

apache-spark amazon-emr thrift dbt nessie
2个回答
2
投票

请注意,发送到 Spark 的 SQL 发送时带有以下注释的前缀:

/* {"app": "dbt", "dbt_version": "1.5.2", "profile_name": "thrift_tests", "target_name": "dev", "node_id": "model.thrift_tests.my_first_dbt_model"} */

虽然当最终的 SQL 是标准的时候,这个注释不会造成问题,但是当它与 nessie 命令(如

USE REFERENCE
LIST REFERENCES
等)一起使用时,它确实会造成问题。

为了解决这个问题,我们必须重写 dbt.adapters.spark.session 中的执行函数并删除注释。

我们是这样做的:

original_execute = Cursor.execute

def execute(self, sql: str, *parameters: Any) -> None:
    try:
        sql = sql[sql.find("*/") + 3:]
        original_execute(self, sql, *parameters)
    except AnalysisException as exc:
        raise DbtRuntimeError(str(exc)) from exc
Cursor.execute = execute

在上面的覆盖之后,我们从代码中运行 DBT 并且成功。 AnalysisException 包装与 dbt-spark 存储库中列出的另一个错误相关。


0
投票

正如@user502490建议的那样,问题源于在每个查询开头插入的查询注释。另一种解决方案是您可以将 dot_project.yml 中的

query-comment
字段设置为
null
。这将阻止 DBT 将元数据注释添加到所有查询中。

# dbt_project.yml
...
query-comment: null
...
© www.soinside.com 2019 - 2024. All rights reserved.