我遇到过一种情况,我可以在常规 EC2 实例上运行 pyspark,手动 pip 安装 pyspark,然后调用 delta-core jar 包依赖项作为 Spark 配置,如其他答案中所述(例如 无法运行 PySpark ( Kafka 到 Delta)在本地并获取 SparkException:无法找到目录“spark_catalog”的目录插件类),但在 EMR 集群上运行我的确切代码时出现
spark_catalog
错误。有没有人遇到过这个问题并知道如何在 Amazon EMR 集群上使用 Pyspark 的解决方法?
# https://docs.delta.io/latest/releases.html
%pip install pyspark==3.2.0
然后使用 delta-core 版本 1.1.0 实例化配置的 Spark 会话(以及使用 s3 的一些其他包/配置)
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
pkg_list = [
"io.delta:delta-core_2.12:1.1.0",
"org.apache.hadoop:hadoop-aws:3.2.0",
"com.amazonaws:aws-java-sdk-bundle:1.12.180"
]
packages = ",".join(pkg_list)
spark = (SparkSession.builder.appName("EDA")
.config("spark.jars.packages", packages)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
)
# This is mandatory config on spark session to use AWS S3
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', <key>)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', <secret>)
print(f"Spark version: {spark.sparkContext.version}")
Spark version: 3.2.0
df = spark.read.format("delta").load("s3a://[my-bucket]/tmp/tmp/lake1/").filter(col('language')=='English')
df.show()
我选择应安装 Spark 3.2 的 emr 版本 (6.6),并使用完全相同的代码,但收到
spark_catalog
错误,表明软件包版本不兼容。难道是 AMZN 的 Spark 安装与 delta-core 不兼容吗?因为它不是“常规”Spark 3.2
而是Spark version: 3.2.0-amzn-0
?
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
pkg_list = [
"io.delta:delta-core_2.12:1.1.0",
"org.apache.hadoop:hadoop-aws:3.2.0",
"com.amazonaws:aws-java-sdk-bundle:1.12.180"
]
packages = ",".join(pkg_list)
spark = (SparkSession.builder.appName("EDA")
.config("spark.jars.packages", packages)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
)
# This is mandatory config on spark session to use AWS S3
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', <key>)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', <secret>)
print(f"Spark version: {spark.sparkContext.version}")
Spark version: 3.2.0-amzn-0
df = spark.read.format("delta").load("s3a://[my-bucket]/tmp/tmp/lake1/").filter(col('language')=='English')
df.show()
问题最终是在笔记本会话中直接在 Spark 上下文本身上设置 Spark
.config()
实际上并没有设置任何内容。
当我从 EMR 集群中获取上下文并尝试在笔记本会话中进行设置时,我就设置了每个配置,但这每次都会引发“找不到目录插件...”错误。
一旦我在 EMR 集群设置期间开始直接设置这些配置根据这些说明,“spark_catalog”错误就消失了,并且 Spark 上下文成功地获取了配置: