我正在尝试读取存储在 S3 存储桶中的 CSV。我已经随 Homebrew 安装了 Apache Spark 3.5.1。我已下载 Hadoop AWS 连接器 并将其复制到
/opt/homebrew/Cellar/apache-spark/3.5.1/libexec/jars
然后,使用以下代码,我尝试从 S3 读取 CSV:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Base Spark Template").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
df2 = spark.read.csv("s3://arapbi/polygon/tickers/", header=True)
失败了
Py4JJavaError: An error occurred while calling o40.csv.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:840)
该错误跟踪的第一行是重要的; Spark 无法识别 S3。
但是,我的理解是,根据我下载的连接器以及通过 Homebrew 安装 Spark 时复制到 Spark Jars 文件夹的 jar 文件,Spark 应该能够识别 S3。
我是否错误地使用了哪个 Jar 文件,或者一般如何配置它?我对 Google Storage 连接器 执行了相同的步骤,并且它工作正常。
我用谷歌搜索并搜索了 Stack Overflow,但没有结果。如果我找到它,我会用答案更新问题,但如果有人设法设置 Brew 安装的 PySpark 以连接到 S3,请让我们其他人知道如何!
将
s3
URI 方案更新为 s3a
URI 方案,因为 Hadoop 仅支持 s3a
客户端。尝试在代码中设置以下配置。
spark = SparkSession.builder.appName("Base Spark Template").getOrCreate()
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain") # Change it according to your auth mechanism
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
df2 = spark.read.csv("s3://arapbi/polygon/tickers/", header=True)