net.snowflake.client.jdbc.SnowflakeSQLException:JWT 令牌无效

问题描述 投票:0回答:2

我无法使用 databricks 中的 pyspark 连接雪花。

def readFromSnowflake():
      private_key = ""
      sfOptions = {
            "sfURL": "swiggy.ap-southeast-1.snowflakecomputing.com",
            "sfUser": "<user>",
            "sfDatabase": "<database>",
            "sfSchema": "<schema>",
            "sfWarehouse": "<warehouse>",
            "sfRole": "<role>",
            "pem_private_key": private_key
        }
      query = "<query>"
      SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
      df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", query).load()
      df.show(10)
    
      
      
    readFromSnowflake() 

当我在 databricks 笔记本中运行上述代码时,出现以下错误

Py4JJavaError: An error occurred while calling o318.load.
: net.snowflake.client.jdbc.SnowflakeSQLException: JWT token is invalid.
    at net.snowflake.client.core.SessionUtil.newSession(SessionUtil.java:585)
    at net.snowflake.client.core.SessionUtil.openSession(SessionUtil.java:272)
    at net.snowflake.client.core.SFSession.open(SFSession.java:543)
    at net.snowflake.client.jdbc.SnowflakeConnectionV1.initialize(SnowflakeConnectionV1.java:167)
    at net.snowflake.client.jdbc.SnowflakeConnectionV1.<init>(SnowflakeConnectionV1.java:119)
    at net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:169)
    at java.sql.DriverManager.getConnection(DriverManager.java:664)
    at java.sql.DriverManager.getConnection(DriverManager.java:208)
    at net.snowflake.spark.snowflake.JDBCWrapper.getConnector(SnowflakeJDBCWrapper.scala:214)
    at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:61)
    at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:58)
    at scala.Option.getOrElse(Option.scala:121)
    at net.snowflake.spark.snowflake.SnowflakeRelation.schema$lzycompute(SnowflakeRelation.scala:58)
    at net.snowflake.spark.snowflake.SnowflakeRelation.schema(SnowflakeRelation.scala:57)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:454)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:307)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:293)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

有人可以帮我看看是什么原因导致了这个问题吗?但我确信我传入的其余选项

sf_option
都是正确的。

python apache-spark pyspark snowflake-cloud-data-platform databricks
2个回答
0
投票

经过一周的尝试和错误终于弄清楚了。

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
import re

spark = SparkSession.builder \
              .config("spark.jars", "path/to/snowflake-spark-connector.jar,path/to/snowflake-jdbc-library.jar,path/to/snowflake-jdbc-fips-library.jar") \
              .config("path/to/snowflake-spark-connector.jar,path/to/snowflake-jdbc-library.jar,path/to/snowflake-jdbc-fips-library.jar") \
              .config("spark.sql.catalogImplementation", "in-memory") \
              .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(
    spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

with open(path/to/your/rsa_key.p8, "rb") as key_file:
    p_key = serialization.load_pem_private_key(
    key_file.read(),
    password="<your private key password>".encode(),
    backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)
 
pkb = pkb.decode("UTF-8")

pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")
 
sfOptions = {
    "sfURL" : "<ACCOUNT>.<REGION>.<CLOUDPROVIDER>.snowflakecomputing.com",
    "sfAccount" : "<ACCOUNT>",
    "sfUser" : "<USER>",
    "pem_private_key": pkb,
    "sfDatabase" : "<DATABASE>",
    "sfSchema" : "<SCHEMA>",
    "sfWarehouse" : "<WAREHOUSE>",
    "sfRole": "<ROLE>",
    "tracing": "ALL",
}
 
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query",  "<TABLE>").load()
df.show()

这里还有下载必要 jar 的 URL。
https://search.maven.org/search?q=snowflake-jdbc
https://search.maven.org/search?q=spark-snowflake

有一种方法可以自动下载罐子,但我想我在这里展示了手动方式,这样就不用挥手了。

我认为对于使用雪花并拥有需要识别区域的 URL 的人们来说这不是问题。认为需要此设置的 Snowflake 环境需要现在被视为旧设置的区域。

https://docs.snowflake.com/en/user-guide/admin-account-identifier.html

因此,在您的具体用例中,我会将您的 sfOptions 更新为以下内容

sfOptions = {
            "sfURL": "swiggy.ap-southeast-1.snowflakecomputing.com",
            "sfAccount" : "swiggy",
            "sfUser": "<user>",
            "sfDatabase": "<database>",
            "sfSchema": "<schema>",
            "sfWarehouse": "<warehouse>",
            "sfRole": "<role>",
            "pem_private_key": pkb
        }


0
投票

sfAccount
与snowpark不同。我必须拆分
url
字符串并使用第一个元素来生成
JWT
令牌。

© www.soinside.com 2019 - 2024. All rights reserved.