如何在本地PySpark上从S3读取

问题描述 投票:0回答:1

我正在尝试读取存储在 S3 存储桶中的 CSV。我已经随 Homebrew 安装了 Apache Spark 3.5.1。我已下载 Hadoop AWS 连接器 并将其复制到

/opt/homebrew/Cellar/apache-spark/3.5.1/libexec/jars

然后,使用以下代码,我尝试从 S3 读取 CSV:

import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Base Spark Template").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df2 = spark.read.csv("s3://arapbi/polygon/tickers/", header=True)

失败了

Py4JJavaError: An error occurred while calling o40.csv.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
    at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)

该错误跟踪的第一行是重要的; Spark 无法识别 S3。

但是,我的理解是,根据我下载的连接器以及通过 Homebrew 安装 Spark 时复制到 Spark Jars 文件夹的 jar 文件,Spark 应该能够识别 S3。

我是否错误地使用了哪个 Jar 文件,或者一般如何配置它?我对 Google Storage 连接器 执行了相同的步骤,并且它工作正常。

我用谷歌搜索并搜索了 Stack Overflow,但没有结果。如果我找到它,我会用答案更新问题,但如果有人设法设置 Brew 安装的 PySpark 以连接到 S3,请让我们其他人知道如何!

apache-spark amazon-s3 pyspark
1个回答
0
投票

s3
URI 方案更新为
s3a
URI 方案,因为 Hadoop 仅支持
s3a
客户端。尝试在代码中设置以下配置。

spark = SparkSession.builder.appName("Base Spark Template").getOrCreate()

spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain") # Change it according to your auth mechanism
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")

df2 = spark.read.csv("s3://arapbi/polygon/tickers/", header=True)
© www.soinside.com 2019 - 2024. All rights reserved.