我无法使用 databricks 中的 pyspark 连接雪花。
def readFromSnowflake():
private_key = ""
sfOptions = {
"sfURL": "swiggy.ap-southeast-1.snowflakecomputing.com",
"sfUser": "<user>",
"sfDatabase": "<database>",
"sfSchema": "<schema>",
"sfWarehouse": "<warehouse>",
"sfRole": "<role>",
"pem_private_key": private_key
}
query = "<query>"
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", query).load()
df.show(10)
readFromSnowflake()
当我在 databricks 笔记本中运行上述代码时,出现以下错误
Py4JJavaError: An error occurred while calling o318.load.
: net.snowflake.client.jdbc.SnowflakeSQLException: JWT token is invalid.
at net.snowflake.client.core.SessionUtil.newSession(SessionUtil.java:585)
at net.snowflake.client.core.SessionUtil.openSession(SessionUtil.java:272)
at net.snowflake.client.core.SFSession.open(SFSession.java:543)
at net.snowflake.client.jdbc.SnowflakeConnectionV1.initialize(SnowflakeConnectionV1.java:167)
at net.snowflake.client.jdbc.SnowflakeConnectionV1.<init>(SnowflakeConnectionV1.java:119)
at net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:169)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at net.snowflake.spark.snowflake.JDBCWrapper.getConnector(SnowflakeJDBCWrapper.scala:214)
at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:61)
at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:58)
at scala.Option.getOrElse(Option.scala:121)
at net.snowflake.spark.snowflake.SnowflakeRelation.schema$lzycompute(SnowflakeRelation.scala:58)
at net.snowflake.spark.snowflake.SnowflakeRelation.schema(SnowflakeRelation.scala:57)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:454)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:307)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:293)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
有人可以帮我看看是什么原因导致了这个问题吗?但我确信我传入的其余选项
sf_option
都是正确的。
经过一周的尝试和错误终于弄清楚了。
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
import re
spark = SparkSession.builder \
.config("spark.jars", "path/to/snowflake-spark-connector.jar,path/to/snowflake-jdbc-library.jar,path/to/snowflake-jdbc-fips-library.jar") \
.config("path/to/snowflake-spark-connector.jar,path/to/snowflake-jdbc-library.jar,path/to/snowflake-jdbc-fips-library.jar") \
.config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate()
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(
spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
with open(path/to/your/rsa_key.p8, "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password="<your private key password>".encode(),
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")
sfOptions = {
"sfURL" : "<ACCOUNT>.<REGION>.<CLOUDPROVIDER>.snowflakecomputing.com",
"sfAccount" : "<ACCOUNT>",
"sfUser" : "<USER>",
"pem_private_key": pkb,
"sfDatabase" : "<DATABASE>",
"sfSchema" : "<SCHEMA>",
"sfWarehouse" : "<WAREHOUSE>",
"sfRole": "<ROLE>",
"tracing": "ALL",
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "<TABLE>").load()
df.show()
这里还有下载必要 jar 的 URL。
https://search.maven.org/search?q=snowflake-jdbc
https://search.maven.org/search?q=spark-snowflake
有一种方法可以自动下载罐子,但我想我在这里展示了手动方式,这样就不用挥手了。
我认为对于使用雪花并拥有需要识别区域的 URL 的人们来说这不是问题。认为需要此设置的 Snowflake 环境需要现在被视为旧设置的区域。
https://docs.snowflake.com/en/user-guide/admin-account-identifier.html
因此,在您的具体用例中,我会将您的 sfOptions 更新为以下内容
sfOptions = {
"sfURL": "swiggy.ap-southeast-1.snowflakecomputing.com",
"sfAccount" : "swiggy",
"sfUser": "<user>",
"sfDatabase": "<database>",
"sfSchema": "<schema>",
"sfWarehouse": "<warehouse>",
"sfRole": "<role>",
"pem_private_key": pkb
}
sfAccount
与snowpark不同。我必须拆分 url
字符串并使用第一个元素来生成 JWT
令牌。