我有 S3 access_key_id、secret_access_key 和端点 URL。
我尝试打开 spar2-shell
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder() .appName("Read ORC from S3") .getOrCreate()
sc.hadoopConfiguration.set("fs.s3a.access.key", "ABC") sc.hadoopConfiguration.set("fs.s3a.secret.key", "2ju0jzWo/ABC") sc.hadoopConfiguration.set("fs.s3a.endpoint", "Https://abc")
val df = spark.read.orc("s3a://rcemqe-24-45ae3433-0511-459e-bdaf-7f1348f9d8d0/user/rcem1403/output/mapsig/combine/rcem_map_sccp_lean_min/usecasename=rcem_map_min/finalcubebintime=1650532150/gran=FifteenMinutes/")
低于警告,然后什么也没有发生,最终说路径未找到。即使路径存在。
24/04/23 16:08:26 WARN lineage.LineageWriter: Lineage directory /var/log/spark2/lineage doesn't exist or is not writable. Lineage for this application will be disabled. 24/04/23 16:08:27 WARN lineage.LineageWriter: Lineage directory /var/log/spark2/lineage doesn't exist or is not writable. Lineage for this application will be disabled. 24/04/23 16:08:27 WARN fs.FileSystem: S3FileSystem is deprecated and will be removed in future releases. Use NativeS3FileSystem or S3AFileSystem instead. 24/04/23 16:16:29 WARN streaming.FileStreamSink: Error while looking for metadata directory.
第 1 步:设置 AWS 凭证:
第 2 步:下载所需的 JAR:
Scala 需要特定的 JAR 文件才能与 AWS 服务交互。您可以从 Maven 存储库下载这些 JAR。访问 Maven 存储库并搜索与您的 Spark 版本兼容的 AWS 开发工具包 JAR。 aws-java-sdk, hadoop-AWS, aws-java-sdk-捆绑包, aws-java-sdk-s3, aws-java-sdk-core
第 3 步:配置 Spark 会话
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("AWS S3 Example")
.config("spark.jars", "/path/to/aws-java-sdk.jar,/path/to/hadoop-aws.jar,/path/to/aws-java-sdk-bundle.jar,/path/to/aws-java-sdk-s3.jar,/path/to/aws-java-sdk-core.jar")
.getOrCreate()
第4步:设置Hadoop配置
val hadoopConf = spark.sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.s3.buffer.dir", "${hadoop.tmp.dir}/s3") // Temporary directory
hadoopConf.set("fs.s3a.buffer.dir", "${hadoop.tmp.dir}/s3a")
第5步:从S3读取数据:
val df = spark.read.text("s3a://your-bucket-name/path/to/file")
df.show()
确保将“/path/to”和“your-bucket-name/path/to/file”等占位符替换为您的实际路径和存储桶名称。